You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:15 UTC
[46/47] flink git commit: [FLINK-3848] [table] follow-up: Refactor
TableSource tests.
[FLINK-3848] [table] follow-up: Refactor TableSource tests.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22af6cf5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22af6cf5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22af6cf5
Branch: refs/heads/master
Commit: 22af6cf57918af645a0bd8282bf1a8e51606f7f0
Parents: ef575e8
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Dec 16 11:19:42 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Dec 16 16:41:21 2016 +0100
----------------------------------------------------------------------
.../table/api/java/batch/TableSourceITCase.java | 68 +++-----
.../batch/ProjectableTableSourceITCase.scala | 146 -----------------
.../batch/ProjectableTableSourceTest.scala | 155 -------------------
.../api/scala/batch/TableSourceITCase.scala | 99 ++----------
.../table/api/scala/batch/TableSourceTest.scala | 155 +++++++++++++++++++
.../api/scala/stream/TableSourceITCase.scala | 127 ++-------------
6 files changed, 199 insertions(+), 551 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
index c822efb..e5777f2 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
@@ -22,10 +22,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.utils.CommonTestData;
import org.apache.flink.types.Row;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.api.scala.batch.GeneratingInputFormat;
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
@@ -45,20 +44,27 @@ public class TableSourceITCase extends TableProgramsTestBase {
@Test
public void testBatchTableSourceTableAPI() throws Exception {
+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+ BatchTableSource csvTable = CommonTestData.getCsvTableSource();
- tableEnv.registerTableSource("MyTable", new TestBatchTableSource());
+ tableEnv.registerTableSource("persons", csvTable);
- Table result = tableEnv.scan("MyTable")
- .where("amount < 4")
- .select("amount * id, name");
+ Table result = tableEnv.scan("persons")
+ .select("id, first, last, score");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
- String expected = "0,Record_0\n" + "0,Record_16\n" + "0,Record_32\n" + "1,Record_1\n" +
- "17,Record_17\n" + "36,Record_18\n" + "4,Record_2\n" + "57,Record_19\n" + "9,Record_3\n";
+ String expected = "1,Mike,Smith,12.3\n" +
+ "2,Bob,Taylor,45.6\n" +
+ "3,Sam,Miller,7.89\n" +
+ "4,Peter,Smith,0.12\n" +
+ "5,Liz,Williams,34.5\n" +
+ "6,Sally,Miller,6.78\n" +
+ "7,Alice,Smith,90.1\n" +
+ "8,Kelly,Williams,2.34\n";
compareResultAsText(results, expected);
}
@@ -67,53 +73,23 @@ public class TableSourceITCase extends TableProgramsTestBase {
public void testBatchTableSourceSQL() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+ BatchTableSource csvTable = CommonTestData.getCsvTableSource();
- tableEnv.registerTableSource("MyTable", new TestBatchTableSource());
+ tableEnv.registerTableSource("persons", csvTable);
Table result = tableEnv
- .sql("SELECT amount * id, name FROM MyTable WHERE amount < 4");
+ .sql("SELECT last, FLOOR(id), score * 2 FROM persons WHERE score < 20");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
- String expected = "0,Record_0\n" + "0,Record_16\n" + "0,Record_32\n" + "1,Record_1\n" +
- "17,Record_17\n" + "36,Record_18\n" + "4,Record_2\n" + "57,Record_19\n" + "9,Record_3\n";
+ String expected = "Smith,1,24.6\n" +
+ "Miller,3,15.78\n" +
+ "Smith,4,0.24\n" +
+ "Miller,6,13.56\n" +
+ "Williams,8,4.68\n";
compareResultAsText(results, expected);
}
- public static class TestBatchTableSource implements BatchTableSource<Row> {
-
- private TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO
- };
-
- @Override
- public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
- return execEnv.createInput(new GeneratingInputFormat(33), getReturnType()).setParallelism(1);
- }
-
- @Override
- public int getNumberOfFields() {
- return 3;
- }
-
- @Override
- public String[] getFieldsNames() {
- return new String[]{"name", "id", "amount"};
- }
-
- @Override
- public TypeInformation<?>[] getFieldTypes() {
- return fieldTypes;
- }
-
- @Override
- public TypeInformation<Row> getReturnType() {
- return new RowTypeInfo(fieldTypes);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
deleted file mode 100644
index 37407c8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit.{Before, Test}
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ProjectableTableSourceITCase(mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsTestBase(mode, configMode) {
-
- private val tableName = "MyTable"
- private var tableEnv: BatchTableEnvironment = null
-
- @Before
- def initTableEnv(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- tableEnv = TableEnvironment.getTableEnvironment(env, config)
- tableEnv.registerTableSource(tableName, new TestProjectableTableSource)
- }
-
- @Test
- def testTableAPI(): Unit = {
- val results = tableEnv
- .scan(tableName)
- .where("amount < 4")
- .select("id, name")
- .collect()
-
- val expected = Seq(
- "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
- "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
-
- @Test
- def testSQL(): Unit = {
- val results = tableEnv
- .sql(s"select id, name from $tableName where amount < 4 ")
- .collect()
-
- val expected = Seq(
- "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
- "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-}
-
-class TestProjectableTableSource(
- fieldTypes: Array[TypeInformation[_]],
- fieldNames: Array[String])
- extends BatchTableSource[Row] with ProjectableTableSource[Row] {
-
- def this() = this(
- fieldTypes = Array(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO),
- fieldNames = Array[String]("name", "id", "amount", "price")
- )
-
- /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */
- override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
- execEnv.fromCollection(generateDynamicCollection(33, fieldNames).asJava, getReturnType)
- }
-
- /** Returns the types of the table fields. */
- override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
- /** Returns the names of the table fields. */
- override def getFieldsNames: Array[String] = fieldNames
-
- /** Returns the [[TypeInformation]] for the return type. */
- override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
-
- /** Returns the number of fields of the table. */
- override def getNumberOfFields: Int = fieldNames.length
-
- override def projectFields(fields: Array[Int]): TestProjectableTableSource = {
- val projectedFieldTypes = new Array[TypeInformation[_]](fields.length)
- val projectedFieldNames = new Array[String](fields.length)
-
- fields.zipWithIndex.foreach(f => {
- projectedFieldTypes(f._2) = fieldTypes(f._1)
- projectedFieldNames(f._2) = fieldNames(f._1)
- })
- new TestProjectableTableSource(projectedFieldTypes, projectedFieldNames)
- }
-
- private def generateDynamicCollection(num: Int, fieldNames: Array[String]): Seq[Row] = {
- for {cnt <- 0 until num}
- yield {
- val row = new Row(fieldNames.length)
- fieldNames.zipWithIndex.foreach(
- f =>
- f._1 match {
- case "name" =>
- row.setField(f._2, "Record_" + cnt)
- case "id" =>
- row.setField(f._2, cnt.toLong)
- case "amount" =>
- row.setField(f._2, cnt.toInt % 16)
- case "price" =>
- row.setField(f._2, cnt.toDouble / 3)
- case _ =>
- throw new IllegalArgumentException(s"unknown field name $f._1")
- }
- )
- row
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
deleted file mode 100644
index b3097cf..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch
-
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.CsvTableSource
-import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ProjectableTableSourceTest extends TableTestBase {
-
- private val projectedFields: Array[String] = Array("last", "id", "score")
- private val noCalcFields: Array[String] = Array("id", "score", "first")
-
- @Test
- def testBatchProjectableSourceScanPlanTableApi(): Unit = {
- val (csvTable, tableName) = tableSource
- val util = batchTestUtil()
- val tEnv = util.tEnv
-
- tEnv.registerTableSource(tableName, csvTable)
-
- val result = tEnv
- .scan(tableName)
- .select('last.upperCase(), 'id.floor(), 'score * 2)
-
- val expected = unaryNode(
- "DataSetCalc",
- sourceBatchTableNode(tableName, projectedFields),
- term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testBatchProjectableSourceScanPlanSQL(): Unit = {
- val (csvTable, tableName) = tableSource
- val util = batchTestUtil()
-
- util.tEnv.registerTableSource(tableName, csvTable)
-
- val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
-
- val expected = unaryNode(
- "DataSetCalc",
- sourceBatchTableNode(tableName, projectedFields),
- term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
- val (csvTable, tableName) = tableSource
- val util = batchTestUtil()
- val tEnv = util.tEnv
-
- tEnv.registerTableSource(tableName, csvTable)
-
- val result = tEnv
- .scan(tableName)
- .select('id, 'score, 'first)
-
- val expected = sourceBatchTableNode(tableName, noCalcFields)
- util.verifyTable(result, expected)
- }
-
- @Test
- def testStreamProjectableSourceScanPlanTableApi(): Unit = {
- val (csvTable, tableName) = tableSource
- val util = streamTestUtil()
- val tEnv = util.tEnv
-
- tEnv.registerTableSource(tableName, csvTable)
-
- val result = tEnv
- .ingest(tableName)
- .select('last, 'id.floor(), 'score * 2)
-
- val expected = unaryNode(
- "DataStreamCalc",
- sourceStreamTableNode(tableName, projectedFields),
- term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testStreamProjectableSourceScanPlanSQL(): Unit = {
- val (csvTable, tableName) = tableSource
- val util = streamTestUtil()
-
- util.tEnv.registerTableSource(tableName, csvTable)
-
- val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
-
- val expected = unaryNode(
- "DataStreamCalc",
- sourceStreamTableNode(tableName, projectedFields),
- term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
- val (csvTable, tableName) = tableSource
- val util = streamTestUtil()
- val tEnv = util.tEnv
-
- tEnv.registerTableSource(tableName, csvTable)
-
- val result = tEnv
- .ingest(tableName)
- .select('id, 'score, 'first)
-
- val expected = sourceStreamTableNode(tableName, noCalcFields)
- util.verifyTable(result, expected)
- }
-
- def tableSource: (CsvTableSource, String) = {
- val csvTable = CommonTestData.getCsvTableSource
- val tableName = "csvTable"
- (csvTable, tableName)
- }
-
- def sourceBatchTableNode(sourceName: String, fields: Array[String]): String = {
- s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
- }
-
- def sourceStreamTableNode(sourceName: String, fields: Array[String] ): String = {
- s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
index e324aad..a9218ac 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
@@ -18,16 +18,10 @@
package org.apache.flink.table.api.scala.batch
-import org.apache.flink.api.common.io.GenericInputFormat
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.BatchTableSource
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.utils.CommonTestData
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -45,41 +39,6 @@ class TableSourceITCase(
extends TableProgramsTestBase(mode, configMode) {
@Test
- def testBatchTableSourceTableAPI(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
- val results = tEnv
- .scan("MyTestTable")
- .where('amount < 4)
- .select('amount * 'id, 'name)
- .collect()
-
- val expected = Seq(
- "0,Record_0", "0,Record_16", "0,Record_32", "1,Record_1", "17,Record_17",
- "36,Record_18", "4,Record_2", "57,Record_19", "9,Record_3").mkString("\n")
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testBatchTableSourceSQL(): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
- val results = tEnv.sql(
- "SELECT amount * id, name FROM MyTestTable WHERE amount < 4").collect()
-
- val expected = Seq(
- "0,Record_0", "0,Record_16", "0,Record_32", "1,Record_1", "17,Record_17",
- "36,Record_18", "4,Record_2", "57,Record_19", "9,Record_3").mkString("\n")
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
def testCsvTableSource(): Unit = {
val csvTable = CommonTestData.getCsvTableSource
@@ -89,13 +48,17 @@ class TableSourceITCase(
tEnv.registerTableSource("csvTable", csvTable)
val results = tEnv.sql(
- "SELECT last, sum(score), max(id) FROM csvTable GROUP BY last").collect()
+ "SELECT id, first, last, score FROM csvTable").collect()
val expected = Seq(
- "Smith,102.52,7",
- "Taylor,45.6,2",
- "Miller,14.67,6",
- "Williams,36.84,8").mkString("\n")
+ "1,Mike,Smith,12.3",
+ "2,Bob,Taylor,45.6",
+ "3,Sam,Miller,7.89",
+ "4,Peter,Smith,0.12",
+ "5,Liz,Williams,34.5",
+ "6,Sally,Miller,6.78",
+ "7,Alice,Smith,90.1",
+ "8,Kelly,Williams,2.34").mkString("\n")
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
@@ -110,59 +73,17 @@ class TableSourceITCase(
val results = tEnv
.scan("csvTable")
+ .where('score < 20)
.select('last, 'id.floor(), 'score * 2)
.collect()
val expected = Seq(
"Smith,1,24.6",
- "Taylor,2,91.2",
"Miller,3,15.78",
"Smith,4,0.24",
- "Williams,5,69.0",
"Miller,6,13.56",
- "Smith,7,180.2",
"Williams,8,4.68").mkString("\n")
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
-}
-
-class TestBatchTableSource extends BatchTableSource[Row] {
-
- val fieldTypes: Array[TypeInformation[_]] = Array(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO
- )
-
- /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */
- override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
- execEnv.createInput(new GeneratingInputFormat(33), getReturnType).setParallelism(1)
- }
- /** Returns the types of the table fields. */
- override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
- /** Returns the names of the table fields. */
- override def getFieldsNames: Array[String] = Array("name", "id", "amount")
-
- /** Returns the [[TypeInformation]] for the return type. */
- override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
-
- /** Returns the number of fields of the table. */
- override def getNumberOfFields: Int = 3
-}
-
-class GeneratingInputFormat(val num: Int) extends GenericInputFormat[Row] {
-
- var cnt = 0L
-
- override def reachedEnd(): Boolean = cnt >= num
-
- override def nextRecord(reuse: Row): Row = {
- reuse.setField(0, s"Record_$cnt")
- reuse.setField(1, cnt)
- reuse.setField(2, (cnt % 16).toInt)
- cnt += 1
- reuse
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
new file mode 100644
index 0000000..55da42a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class TableSourceTest extends TableTestBase {
+
+ private val projectedFields: Array[String] = Array("last", "id", "score")
+ private val noCalcFields: Array[String] = Array("id", "score", "first")
+
+ @Test
+ def testBatchProjectableSourceScanPlanTableApi(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = batchTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerTableSource(tableName, csvTable)
+
+ val result = tEnv
+ .scan(tableName)
+ .select('last.upperCase(), 'id.floor(), 'score * 2)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(tableName, projectedFields),
+ term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testBatchProjectableSourceScanPlanSQL(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = batchTestUtil()
+
+ util.tEnv.registerTableSource(tableName, csvTable)
+
+ val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(tableName, projectedFields),
+ term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = batchTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerTableSource(tableName, csvTable)
+
+ val result = tEnv
+ .scan(tableName)
+ .select('id, 'score, 'first)
+
+ val expected = sourceBatchTableNode(tableName, noCalcFields)
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testStreamProjectableSourceScanPlanTableApi(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = streamTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerTableSource(tableName, csvTable)
+
+ val result = tEnv
+ .ingest(tableName)
+ .select('last, 'id.floor(), 'score * 2)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(tableName, projectedFields),
+ term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testStreamProjectableSourceScanPlanSQL(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = streamTestUtil()
+
+ util.tEnv.registerTableSource(tableName, csvTable)
+
+ val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(tableName, projectedFields),
+ term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
+ val (csvTable, tableName) = tableSource
+ val util = streamTestUtil()
+ val tEnv = util.tEnv
+
+ tEnv.registerTableSource(tableName, csvTable)
+
+ val result = tEnv
+ .ingest(tableName)
+ .select('id, 'score, 'first)
+
+ val expected = sourceStreamTableNode(tableName, noCalcFields)
+ util.verifyTable(result, expected)
+ }
+
+ def tableSource: (CsvTableSource, String) = {
+ val csvTable = CommonTestData.getCsvTableSource
+ val tableName = "csvTable"
+ (csvTable, tableName)
+ }
+
+ def sourceBatchTableNode(sourceName: String, fields: Array[String]): String = {
+ s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+ }
+
+ def sourceStreamTableNode(sourceName: String, fields: Array[String] ): String = {
+ s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 316f2f3..381bd5d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -18,16 +18,9 @@
package org.apache.flink.table.api.scala.stream
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.table.api.scala.stream.utils.StreamITCase
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.api.scala._
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.TableEnvironment
@@ -41,51 +34,6 @@ import scala.collection.mutable
class TableSourceITCase extends StreamingMultipleProgramsTestBase {
@Test
- def testStreamTableSourceTableAPI(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
- tEnv.ingest("MyTestTable")
- .where('amount < 4)
- .select('amount * 'id, 'name)
- .toDataStream[Row]
- .addSink(new StreamITCase.StringSink)
-
- env.execute()
-
- val expected = mutable.MutableList(
- "0,Record_0", "0,Record_16", "0,Record_32",
- "1,Record_1", "17,Record_17", "36,Record_18",
- "4,Record_2", "57,Record_19", "9,Record_3")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testStreamTableSourceSQL(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
- tEnv.sql(
- "SELECT amount * id, name FROM MyTestTable WHERE amount < 4")
- .toDataStream[Row]
- .addSink(new StreamITCase.StringSink)
-
- env.execute()
-
- val expected = mutable.MutableList(
- "0,Record_0", "0,Record_16", "0,Record_32",
- "1,Record_1", "17,Record_17", "36,Record_18",
- "4,Record_2", "57,Record_19", "9,Record_3")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
def testCsvTableSourceSQL(): Unit = {
val csvTable = CommonTestData.getCsvTableSource
@@ -94,18 +42,19 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- tEnv.registerTableSource("csvTable", csvTable)
+ tEnv.registerTableSource("persons", csvTable)
+
tEnv.sql(
- "SELECT last, score, id FROM csvTable WHERE id < 4 ")
+ "SELECT id, first, last, score FROM persons WHERE id < 4 ")
.toDataStream[Row]
.addSink(new StreamITCase.StringSink)
env.execute()
val expected = mutable.MutableList(
- "Smith,12.3,1",
- "Taylor,45.6,2",
- "Miller,7.89,3")
+ "1,Mike,Smith,12.3",
+ "2,Bob,Taylor,45.6",
+ "3,Sam,Miller,7.89")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -120,70 +69,18 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
tEnv.registerTableSource("csvTable", csvTable)
tEnv.ingest("csvTable")
- .select('last, 'id.floor(), 'score * 2)
+ .where('id > 4)
+ .select('last, 'score * 2)
.toDataStream[Row]
.addSink(new StreamITCase.StringSink)
env.execute()
val expected = mutable.MutableList(
- "Smith,1,24.6",
- "Taylor,2,91.2",
- "Miller,3,15.78",
- "Smith,4,0.24",
- "Williams,5,69.0",
- "Miller,6,13.56",
- "Smith,7,180.2",
- "Williams,8,4.68")
+ "Williams,69.0",
+ "Miller,13.56",
+ "Smith,180.2",
+ "Williams,4.68")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
}
-
-
-class TestStreamTableSource(val numRecords: Int) extends StreamTableSource[Row] {
-
- val fieldTypes: Array[TypeInformation[_]] = Array(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO
- )
-
- /** Returns the data of the table as a [[DataStream]]. */
- override def getDataStream(execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
- execEnv.addSource(new GeneratingSourceFunction(numRecords), getReturnType).setParallelism(1)
- }
-
- /** Returns the types of the table fields. */
- override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
- /** Returns the names of the table fields. */
- override def getFieldsNames: Array[String] = Array("name", "id", "amount")
-
- /** Returns the [[TypeInformation]] for the return type. */
- override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
-
- /** Returns the number of fields of the table. */
- override def getNumberOfFields: Int = 3
-}
-
-class GeneratingSourceFunction(val num: Long) extends SourceFunction[Row] {
-
- var running = true
-
- override def run(ctx: SourceContext[Row]): Unit = {
- var cnt = 0L
- while(running && cnt < num) {
- val out = new Row(3)
- out.setField(0, s"Record_$cnt")
- out.setField(1, cnt)
- out.setField(2, (cnt % 16).toInt)
-
- ctx.collect(out)
- cnt += 1
- }
- }
-
- override def cancel(): Unit = {
- running = false
- }
-}