You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:15 UTC

[46/47] flink git commit: [FLINK-3848] [table] follow-up: Refactor TableSource tests.

[FLINK-3848] [table] follow-up: Refactor TableSource tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22af6cf5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22af6cf5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22af6cf5

Branch: refs/heads/master
Commit: 22af6cf57918af645a0bd8282bf1a8e51606f7f0
Parents: ef575e8
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Dec 16 11:19:42 2016 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Dec 16 16:41:21 2016 +0100

----------------------------------------------------------------------
 .../table/api/java/batch/TableSourceITCase.java |  68 +++-----
 .../batch/ProjectableTableSourceITCase.scala    | 146 -----------------
 .../batch/ProjectableTableSourceTest.scala      | 155 -------------------
 .../api/scala/batch/TableSourceITCase.scala     |  99 ++----------
 .../table/api/scala/batch/TableSourceTest.scala | 155 +++++++++++++++++++
 .../api/scala/stream/TableSourceITCase.scala    | 127 ++-------------
 6 files changed, 199 insertions(+), 551 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
index c822efb..e5777f2 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
@@ -22,10 +22,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.utils.CommonTestData;
 import org.apache.flink.types.Row;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.table.api.scala.batch.GeneratingInputFormat;
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
@@ -45,20 +44,27 @@ public class TableSourceITCase extends TableProgramsTestBase {
 
 	@Test
 	public void testBatchTableSourceTableAPI() throws Exception {
+
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+		BatchTableSource csvTable = CommonTestData.getCsvTableSource();
 
-		tableEnv.registerTableSource("MyTable", new TestBatchTableSource());
+		tableEnv.registerTableSource("persons", csvTable);
 
-		Table result = tableEnv.scan("MyTable")
-			.where("amount < 4")
-			.select("amount * id, name");
+		Table result = tableEnv.scan("persons")
+			.select("id, first, last, score");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();
 
-		String expected = "0,Record_0\n" + "0,Record_16\n" + "0,Record_32\n" + "1,Record_1\n" +
-			"17,Record_17\n" + "36,Record_18\n" + "4,Record_2\n" + "57,Record_19\n" + "9,Record_3\n";
+		String expected = "1,Mike,Smith,12.3\n" +
+			"2,Bob,Taylor,45.6\n" +
+			"3,Sam,Miller,7.89\n" +
+			"4,Peter,Smith,0.12\n" +
+			"5,Liz,Williams,34.5\n" +
+			"6,Sally,Miller,6.78\n" +
+			"7,Alice,Smith,90.1\n" +
+			"8,Kelly,Williams,2.34\n";
 
 		compareResultAsText(results, expected);
 	}
@@ -67,53 +73,23 @@ public class TableSourceITCase extends TableProgramsTestBase {
 	public void testBatchTableSourceSQL() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+		BatchTableSource csvTable = CommonTestData.getCsvTableSource();
 
-		tableEnv.registerTableSource("MyTable", new TestBatchTableSource());
+		tableEnv.registerTableSource("persons", csvTable);
 
 		Table result = tableEnv
-			.sql("SELECT amount * id, name FROM MyTable WHERE amount < 4");
+			.sql("SELECT last, FLOOR(id), score * 2 FROM persons WHERE score < 20");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();
 
-		String expected = "0,Record_0\n" + "0,Record_16\n" + "0,Record_32\n" + "1,Record_1\n" +
-			"17,Record_17\n" + "36,Record_18\n" + "4,Record_2\n" + "57,Record_19\n" + "9,Record_3\n";
+		String expected = "Smith,1,24.6\n" +
+			"Miller,3,15.78\n" +
+			"Smith,4,0.24\n" +
+			"Miller,6,13.56\n" +
+			"Williams,8,4.68\n";
 
 		compareResultAsText(results, expected);
 	}
 
-	public static class TestBatchTableSource implements BatchTableSource<Row> {
-
-		private TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
-			BasicTypeInfo.STRING_TYPE_INFO,
-			BasicTypeInfo.LONG_TYPE_INFO,
-			BasicTypeInfo.INT_TYPE_INFO
-		};
-
-		@Override
-		public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
-			return execEnv.createInput(new GeneratingInputFormat(33), getReturnType()).setParallelism(1);
-		}
-
-		@Override
-		public int getNumberOfFields() {
-			return 3;
-		}
-
-		@Override
-		public String[] getFieldsNames() {
-			return new String[]{"name", "id", "amount"};
-		}
-
-		@Override
-		public TypeInformation<?>[] getFieldTypes() {
-			return fieldTypes;
-		}
-
-		@Override
-		public TypeInformation<Row> getReturnType() {
-			return new RowTypeInfo(fieldTypes);
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
deleted file mode 100644
index 37407c8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit.{Before, Test}
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ProjectableTableSourceITCase(mode: TestExecutionMode,
-  configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  private val tableName = "MyTable"
-  private var tableEnv: BatchTableEnvironment = null
-
-  @Before
-  def initTableEnv(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    tableEnv.registerTableSource(tableName, new TestProjectableTableSource)
-  }
-
-  @Test
-  def testTableAPI(): Unit = {
-    val results = tableEnv
-                  .scan(tableName)
-                  .where("amount < 4")
-                  .select("id, name")
-                  .collect()
-
-    val expected = Seq(
-      "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
-      "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-  @Test
-  def testSQL(): Unit = {
-    val results = tableEnv
-                  .sql(s"select id, name from $tableName where amount < 4 ")
-                  .collect()
-
-    val expected = Seq(
-      "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
-      "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}
-
-class TestProjectableTableSource(
-  fieldTypes: Array[TypeInformation[_]],
-  fieldNames: Array[String])
-  extends BatchTableSource[Row] with ProjectableTableSource[Row] {
-
-  def this() = this(
-    fieldTypes = Array(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.LONG_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO),
-    fieldNames = Array[String]("name", "id", "amount", "price")
-  )
-
-  /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */
-  override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
-    execEnv.fromCollection(generateDynamicCollection(33, fieldNames).asJava, getReturnType)
-  }
-
-  /** Returns the types of the table fields. */
-  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
-  /** Returns the names of the table fields. */
-  override def getFieldsNames: Array[String] = fieldNames
-
-  /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
-
-  /** Returns the number of fields of the table. */
-  override def getNumberOfFields: Int = fieldNames.length
-
-  override def projectFields(fields: Array[Int]): TestProjectableTableSource = {
-    val projectedFieldTypes = new Array[TypeInformation[_]](fields.length)
-    val projectedFieldNames = new Array[String](fields.length)
-
-    fields.zipWithIndex.foreach(f => {
-      projectedFieldTypes(f._2) = fieldTypes(f._1)
-      projectedFieldNames(f._2) = fieldNames(f._1)
-    })
-    new TestProjectableTableSource(projectedFieldTypes, projectedFieldNames)
-  }
-
-  private def generateDynamicCollection(num: Int, fieldNames: Array[String]): Seq[Row] = {
-    for {cnt <- 0 until num}
-      yield {
-        val row = new Row(fieldNames.length)
-        fieldNames.zipWithIndex.foreach(
-          f =>
-            f._1 match {
-              case "name" =>
-                row.setField(f._2, "Record_" + cnt)
-              case "id" =>
-                row.setField(f._2, cnt.toLong)
-              case "amount" =>
-                row.setField(f._2, cnt.toInt % 16)
-              case "price" =>
-                row.setField(f._2, cnt.toDouble / 3)
-              case _ =>
-                throw new IllegalArgumentException(s"unknown field name $f._1")
-            }
-        )
-        row
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
deleted file mode 100644
index b3097cf..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceTest.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch
-
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.CsvTableSource
-import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ProjectableTableSourceTest extends TableTestBase {
-
-  private val projectedFields: Array[String] = Array("last", "id", "score")
-  private val noCalcFields: Array[String] = Array("id", "score", "first")
-
-  @Test
-  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('last.upperCase(), 'id.floor(), 'score * 2)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      sourceBatchTableNode(tableName, projectedFields),
-      term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchProjectableSourceScanPlanSQL(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = batchTestUtil()
-
-    util.tEnv.registerTableSource(tableName, csvTable)
-
-    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      sourceBatchTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = batchTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('id, 'score, 'first)
-
-    val expected = sourceBatchTableNode(tableName, noCalcFields)
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanPlanTableApi(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = streamTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .ingest(tableName)
-      .select('last, 'id.floor(), 'score * 2)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      sourceStreamTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanPlanSQL(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = streamTestUtil()
-
-    util.tEnv.registerTableSource(tableName, csvTable)
-
-    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      sourceStreamTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
-    val (csvTable, tableName) = tableSource
-    val util = streamTestUtil()
-    val tEnv = util.tEnv
-
-    tEnv.registerTableSource(tableName, csvTable)
-
-    val result = tEnv
-      .ingest(tableName)
-      .select('id, 'score, 'first)
-
-    val expected = sourceStreamTableNode(tableName, noCalcFields)
-    util.verifyTable(result, expected)
-  }
-
-  def tableSource: (CsvTableSource, String) = {
-    val csvTable = CommonTestData.getCsvTableSource
-    val tableName = "csvTable"
-    (csvTable, tableName)
-  }
-
-  def sourceBatchTableNode(sourceName: String, fields: Array[String]): String = {
-    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-
-  def sourceStreamTableNode(sourceName: String, fields: Array[String] ): String = {
-    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
index e324aad..a9218ac 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala
@@ -18,16 +18,10 @@
 
 package org.apache.flink.table.api.scala.batch
 
-import org.apache.flink.api.common.io.GenericInputFormat
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
 import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.BatchTableSource
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.utils.CommonTestData
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -45,41 +39,6 @@ class TableSourceITCase(
   extends TableProgramsTestBase(mode, configMode) {
 
   @Test
-  def testBatchTableSourceTableAPI(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
-    val results = tEnv
-      .scan("MyTestTable")
-      .where('amount < 4)
-      .select('amount * 'id, 'name)
-      .collect()
-
-    val expected = Seq(
-      "0,Record_0", "0,Record_16", "0,Record_32", "1,Record_1", "17,Record_17",
-      "36,Record_18", "4,Record_2", "57,Record_19", "9,Record_3").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testBatchTableSourceSQL(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
-    val results = tEnv.sql(
-      "SELECT amount * id, name FROM MyTestTable WHERE amount < 4").collect()
-
-    val expected = Seq(
-      "0,Record_0", "0,Record_16", "0,Record_32", "1,Record_1", "17,Record_17",
-      "36,Record_18", "4,Record_2", "57,Record_19", "9,Record_3").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
   def testCsvTableSource(): Unit = {
 
     val csvTable = CommonTestData.getCsvTableSource
@@ -89,13 +48,17 @@ class TableSourceITCase(
 
     tEnv.registerTableSource("csvTable", csvTable)
     val results = tEnv.sql(
-      "SELECT last, sum(score), max(id) FROM csvTable GROUP BY last").collect()
+      "SELECT id, first, last, score FROM csvTable").collect()
 
     val expected = Seq(
-      "Smith,102.52,7",
-      "Taylor,45.6,2",
-      "Miller,14.67,6",
-      "Williams,36.84,8").mkString("\n")
+      "1,Mike,Smith,12.3",
+      "2,Bob,Taylor,45.6",
+      "3,Sam,Miller,7.89",
+      "4,Peter,Smith,0.12",
+      "5,Liz,Williams,34.5",
+      "6,Sally,Miller,6.78",
+      "7,Alice,Smith,90.1",
+      "8,Kelly,Williams,2.34").mkString("\n")
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -110,59 +73,17 @@ class TableSourceITCase(
 
     val results = tEnv
       .scan("csvTable")
+      .where('score < 20)
       .select('last, 'id.floor(), 'score * 2)
       .collect()
 
     val expected = Seq(
       "Smith,1,24.6",
-      "Taylor,2,91.2",
       "Miller,3,15.78",
       "Smith,4,0.24",
-      "Williams,5,69.0",
       "Miller,6,13.56",
-      "Smith,7,180.2",
       "Williams,8,4.68").mkString("\n")
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
-}
-
-class TestBatchTableSource extends BatchTableSource[Row] {
-
-  val fieldTypes: Array[TypeInformation[_]] = Array(
-    BasicTypeInfo.STRING_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO,
-    BasicTypeInfo.INT_TYPE_INFO
-  )
-
-  /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */
-  override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
-    execEnv.createInput(new GeneratingInputFormat(33), getReturnType).setParallelism(1)
-  }
 
-  /** Returns the types of the table fields. */
-  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
-  /** Returns the names of the table fields. */
-  override def getFieldsNames: Array[String] = Array("name", "id", "amount")
-
-  /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
-
-  /** Returns the number of fields of the table. */
-  override def getNumberOfFields: Int = 3
-}
-
-class GeneratingInputFormat(val num: Int) extends GenericInputFormat[Row] {
-
-  var cnt = 0L
-
-  override def reachedEnd(): Boolean = cnt >= num
-
-  override def nextRecord(reuse: Row): Row = {
-    reuse.setField(0, s"Record_$cnt")
-    reuse.setField(1, cnt)
-    reuse.setField(2, (cnt % 16).toInt)
-    cnt += 1
-    reuse
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
new file mode 100644
index 0000000..55da42a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceTest.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.utils.{CommonTestData, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class TableSourceTest extends TableTestBase {
+
+  private val projectedFields: Array[String] = Array("last", "id", "score")
+  private val noCalcFields: Array[String] = Array("id", "score", "first")
+
+  @Test
+  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, csvTable)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('last.upperCase(), 'id.floor(), 'score * 2)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      sourceBatchTableNode(tableName, projectedFields),
+      term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanPlanSQL(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = batchTestUtil()
+
+    util.tEnv.registerTableSource(tableName, csvTable)
+
+    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      sourceBatchTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = batchTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, csvTable)
+
+    val result = tEnv
+      .scan(tableName)
+      .select('id, 'score, 'first)
+
+    val expected = sourceBatchTableNode(tableName, noCalcFields)
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanPlanTableApi(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, csvTable)
+
+    val result = tEnv
+      .ingest(tableName)
+      .select('last, 'id.floor(), 'score * 2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      sourceStreamTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanPlanSQL(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = streamTestUtil()
+
+    util.tEnv.registerTableSource(tableName, csvTable)
+
+    val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      sourceStreamTableNode(tableName, projectedFields),
+      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
+    val (csvTable, tableName) = tableSource
+    val util = streamTestUtil()
+    val tEnv = util.tEnv
+
+    tEnv.registerTableSource(tableName, csvTable)
+
+    val result = tEnv
+      .ingest(tableName)
+      .select('id, 'score, 'first)
+
+    val expected = sourceStreamTableNode(tableName, noCalcFields)
+    util.verifyTable(result, expected)
+  }
+
+  def tableSource: (CsvTableSource, String) = {
+    val csvTable = CommonTestData.getCsvTableSource
+    val tableName = "csvTable"
+    (csvTable, tableName)
+  }
+
+  def sourceBatchTableNode(sourceName: String, fields: Array[String]): String = {
+    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+
+  def sourceStreamTableNode(sourceName: String, fields: Array[String] ): String = {
+    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/22af6cf5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
index 316f2f3..381bd5d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -18,16 +18,9 @@
 
 package org.apache.flink.table.api.scala.stream
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.table.api.scala.stream.utils.StreamITCase
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.StreamTableSource
 import org.apache.flink.api.scala._
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
@@ -41,51 +34,6 @@ import scala.collection.mutable
 class TableSourceITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
-  def testStreamTableSourceTableAPI(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
-    tEnv.ingest("MyTestTable")
-      .where('amount < 4)
-      .select('amount * 'id, 'name)
-      .toDataStream[Row]
-      .addSink(new StreamITCase.StringSink)
-
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "0,Record_0", "0,Record_16", "0,Record_32",
-      "1,Record_1", "17,Record_17", "36,Record_18",
-      "4,Record_2", "57,Record_19", "9,Record_3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testStreamTableSourceSQL(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
-    tEnv.sql(
-      "SELECT amount * id, name FROM MyTestTable WHERE amount < 4")
-      .toDataStream[Row]
-      .addSink(new StreamITCase.StringSink)
-
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "0,Record_0", "0,Record_16", "0,Record_32",
-      "1,Record_1", "17,Record_17", "36,Record_18",
-      "4,Record_2", "57,Record_19", "9,Record_3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
   def testCsvTableSourceSQL(): Unit = {
 
     val csvTable = CommonTestData.getCsvTableSource
@@ -94,18 +42,19 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
-    tEnv.registerTableSource("csvTable", csvTable)
+    tEnv.registerTableSource("persons", csvTable)
+
     tEnv.sql(
-      "SELECT last, score, id FROM csvTable WHERE id < 4 ")
+      "SELECT id, first, last, score FROM persons WHERE id < 4 ")
       .toDataStream[Row]
       .addSink(new StreamITCase.StringSink)
 
     env.execute()
 
     val expected = mutable.MutableList(
-      "Smith,12.3,1",
-      "Taylor,45.6,2",
-      "Miller,7.89,3")
+      "1,Mike,Smith,12.3",
+      "2,Bob,Taylor,45.6",
+      "3,Sam,Miller,7.89")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -120,70 +69,18 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
 
     tEnv.registerTableSource("csvTable", csvTable)
     tEnv.ingest("csvTable")
-      .select('last, 'id.floor(), 'score * 2)
+      .where('id > 4)
+      .select('last, 'score * 2)
       .toDataStream[Row]
       .addSink(new StreamITCase.StringSink)
 
     env.execute()
 
     val expected = mutable.MutableList(
-      "Smith,1,24.6",
-      "Taylor,2,91.2",
-      "Miller,3,15.78",
-      "Smith,4,0.24",
-      "Williams,5,69.0",
-      "Miller,6,13.56",
-      "Smith,7,180.2",
-      "Williams,8,4.68")
+      "Williams,69.0",
+      "Miller,13.56",
+      "Smith,180.2",
+      "Williams,4.68")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 }
-
-
-class TestStreamTableSource(val numRecords: Int) extends StreamTableSource[Row] {
-
-  val fieldTypes: Array[TypeInformation[_]] = Array(
-    BasicTypeInfo.STRING_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO,
-    BasicTypeInfo.INT_TYPE_INFO
-  )
-
-  /** Returns the data of the table as a [[DataStream]]. */
-  override def getDataStream(execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
-    execEnv.addSource(new GeneratingSourceFunction(numRecords), getReturnType).setParallelism(1)
-  }
-
-  /** Returns the types of the table fields. */
-  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
-  /** Returns the names of the table fields. */
-  override def getFieldsNames: Array[String] = Array("name", "id", "amount")
-
-  /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
-
-  /** Returns the number of fields of the table. */
-  override def getNumberOfFields: Int = 3
-}
-
-class GeneratingSourceFunction(val num: Long) extends SourceFunction[Row] {
-
-  var running = true
-
-  override def run(ctx: SourceContext[Row]): Unit = {
-    var cnt = 0L
-    while(running && cnt < num) {
-      val out = new Row(3)
-      out.setField(0, s"Record_$cnt")
-      out.setField(1, cnt)
-      out.setField(2, (cnt % 16).toInt)
-
-      ctx.collect(out)
-      cnt += 1
-    }
-  }
-
-  override def cancel(): Unit = {
-    running = false
-  }
-}