You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/10/30 13:20:48 UTC

[2/4] flink git commit: [FLINK-7548] [table] Refactor TableSource interface.

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index 6321e09..4b88bc3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -131,7 +131,9 @@ class TableSourceTest extends TableTestBase {
 
     val expected = unaryNode(
       "DataSetCalc",
-      batchSourceTableNode(tableName, Array("first")),
+      s"BatchTableSourceScan(table=[[$tableName]], " +
+        s"fields=[], " +
+        s"source=[CsvTableSource(read fields: first)])",
       term("select", "1 AS _c0")
     )
 
@@ -153,9 +155,7 @@ class TableSourceTest extends TableTestBase {
 
     val expected = unaryNode(
       "DataSetCalc",
-      batchSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price")),
+      "BatchTableSourceScan(table=[[filterableTable]], fields=[price, id, amount])",
       term("select", "price", "id", "amount"),
       term("where", "<(*(price, 2), 32)")
     )
@@ -180,7 +180,7 @@ class TableSourceTest extends TableTestBase {
       "DataSetCalc",
       batchFilterableSourceTableNode(
         tableName,
-        Array("name", "id", "amount", "price"),
+        Array("price", "name", "amount"),
         "'amount > 2"),
       term("select", "price", "LOWER(name) AS _c1", "amount"),
       term("where", "<(*(price, 2), 32)")
@@ -201,14 +201,10 @@ class TableSourceTest extends TableTestBase {
         .select('price, 'id, 'amount)
         .where("amount > 2 && amount < 32")
 
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2 && 'amount < 32"),
-      term("select", "price", "id", "amount")
-    )
+    val expected = batchFilterableSourceTableNode(
+      tableName,
+      Array("price", "id", "amount"),
+      "'amount > 2 && 'amount < 32")
     util.verifyTable(result, expected)
   }
 
@@ -229,7 +225,7 @@ class TableSourceTest extends TableTestBase {
       "DataSetCalc",
       batchFilterableSourceTableNode(
         tableName,
-        Array("name", "id", "amount", "price"),
+        Array("price", "id", "amount"),
         "'amount > 2"),
       term("select", "price", "id", "amount"),
       term("where", "OR(<(amount, 32), >(CAST(amount), 10))")
@@ -256,7 +252,7 @@ class TableSourceTest extends TableTestBase {
       "DataSetCalc",
       batchFilterableSourceTableNode(
         tableName,
-        Array("name", "id", "amount", "price"),
+        Array("price", "id", "amount"),
         "'amount > 2"),
       term("select", "price", "id", "amount"),
       term("where", s"<(${Func0.getClass.getSimpleName}(amount), 32)")
@@ -339,7 +335,7 @@ class TableSourceTest extends TableTestBase {
       "DataStreamCalc",
       streamFilterableSourceTableNode(
         tableName,
-        Array("name", "id", "amount", "price"),
+        Array("price", "id", "amount"),
         "'amount > 2"),
       term("select", "price", "id", "amount"),
       term("where", "<(*(price, 2), 32)")
@@ -392,11 +388,15 @@ class TableSourceTest extends TableTestBase {
   }
 
   def batchSourceTableNode(sourceName: String, fields: Array[String]): String = {
-    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+    s"BatchTableSourceScan(table=[[$sourceName]], " +
+      s"fields=[${fields.mkString(", ")}], " +
+      s"source=[CsvTableSource(read fields: ${fields.mkString(", ")})])"
   }
 
   def streamSourceTableNode(sourceName: String, fields: Array[String] ): String = {
-    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
+    s"StreamTableSourceScan(table=[[$sourceName]], " +
+      s"fields=[${fields.mkString(", ")}], " +
+      s"source=[CsvTableSource(read fields: ${fields.mkString(", ")})])"
   }
 
   def batchFilterableSourceTableNode(

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
index be073bd..c53f5ac 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TableSourceTest.scala
@@ -20,78 +20,74 @@ package org.apache.flink.table.api.stream.table
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.{TableSchema, Types}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources._
 import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.{TableTestBase, TestNestedProjectableTableSource, TestProjectableTableSource, TestTableSourceWithTime}
 import org.apache.flink.types.Row
-import org.junit.{Assert, Test}
+import org.junit.Test
 
 class TableSourceTest extends TableTestBase {
 
   @Test
   def testTableSourceWithLongRowTimeField(): Unit = {
 
-    val tableSource = new TestRowtimeSource(
+    val tableSchema = new TableSchema(
       Array("id", "rowtime", "val", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+    val returnType = new RowTypeInfo(
       Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
         .asInstanceOf[Array[TypeInformation[_]]],
-      "rowtime"
-    )
+      Array("id", "rowtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.registerTableSource("rowTimeT", tableSource)
+    util.tableEnv.registerTableSource(
+      "rowTimeT",
+      new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
 
     val t = util.tableEnv.scan("rowTimeT").select("rowtime, id, name, val")
 
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])",
-        term("select", "rowtime", "id", "name", "val")
-      )
+    val expected = "StreamTableSourceScan(table=[[rowTimeT]], fields=[rowtime, id, name, val])"
     util.verifyTable(t, expected)
   }
 
   @Test
   def testTableSourceWithTimestampRowTimeField(): Unit = {
 
-    val tableSource = new TestRowtimeSource(
+    val tableSchema = new TableSchema(
       Array("id", "rowtime", "val", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+    val returnType = new RowTypeInfo(
       Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)
         .asInstanceOf[Array[TypeInformation[_]]],
-      "rowtime"
-    )
+      Array("id", "rowtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.registerTableSource("rowTimeT", tableSource)
+    util.tableEnv.registerTableSource(
+      "rowTimeT",
+      new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
 
     val t = util.tableEnv.scan("rowTimeT").select("rowtime, id, name, val")
 
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])",
-        term("select", "rowtime", "id", "name", "val")
-      )
+    val expected = "StreamTableSourceScan(table=[[rowTimeT]], fields=[rowtime, id, name, val])"
     util.verifyTable(t, expected)
   }
 
   @Test
   def testRowTimeTableSourceGroupWindow(): Unit = {
 
-    val tableSource = new TestRowtimeSource(
+    val tableSchema = new TableSchema(
       Array("id", "rowtime", "val", "name"),
-      Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING)
         .asInstanceOf[Array[TypeInformation[_]]],
-      "rowtime"
-    )
+      Array("id", "rowtime", "val", "name"))
 
     val util = streamTestUtil()
-    util.tableEnv.registerTableSource("rowTimeT", tableSource)
+    util.tableEnv.registerTableSource(
+      "rowTimeT",
+      new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), rowtime = "rowtime"))
 
     val t = util.tableEnv.scan("rowTimeT")
       .filter("val > 100")
@@ -106,8 +102,8 @@ class TableSourceTest extends TableTestBase {
           "DataStreamGroupWindowAggregate",
           unaryNode(
             "DataStreamCalc",
-            "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, rowtime, val, name])",
-            term("select", "name", "val", "rowtime"),
+            "StreamTableSourceScan(table=[[rowTimeT]], fields=[rowtime, val, name])",
+            term("select", "rowtime", "val", "name"),
             term("where", ">(val, 100)")
           ),
           term("groupBy", "name"),
@@ -121,27 +117,47 @@ class TableSourceTest extends TableTestBase {
 
   @Test
   def testProcTimeTableSourceSimple(): Unit = {
+
+    val tableSchema = new TableSchema(
+      Array("id", "proctime", "val", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "val", "name"))
+
     val util = streamTestUtil()
-    util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
+    util.tableEnv.registerTableSource(
+      "procTimeT",
+      new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime"))
 
-    val t = util.tableEnv.scan("procTimeT").select("pTime, id, name, val")
+    val t = util.tableEnv.scan("procTimeT").select("proctime, id, name, val")
 
     val expected =
       unaryNode(
         "DataStreamCalc",
-        "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
-        term("select", "PROCTIME(pTime) AS pTime", "id", "name", "val")
+        "StreamTableSourceScan(table=[[procTimeT]], fields=[id, proctime, val, name])",
+        term("select", "PROCTIME(proctime) AS proctime", "id", "name", "val")
       )
     util.verifyTable(t, expected)
   }
 
   @Test
   def testProcTimeTableSourceOverWindow(): Unit = {
+
+    val tableSchema = new TableSchema(
+      Array("id", "proctime", "val", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "val", "name"))
+
     val util = streamTestUtil()
-    util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource("pTime"))
+    util.tableEnv.registerTableSource(
+      "procTimeT",
+      new TestTableSourceWithTime[Row](tableSchema, returnType, Seq(), proctime = "proctime"))
 
     val t = util.tableEnv.scan("procTimeT")
-      .window(Over partitionBy 'id orderBy 'pTime preceding 2.hours as 'w)
+      .window(Over partitionBy 'id orderBy 'proctime preceding 2.hours as 'w)
       .select('id, 'name, 'val.sum over 'w as 'valSum)
       .filter('valSum > 100)
 
@@ -150,11 +166,11 @@ class TableSourceTest extends TableTestBase {
         "DataStreamCalc",
         unaryNode(
           "DataStreamOverAggregate",
-          "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
+          "StreamTableSourceScan(table=[[procTimeT]], fields=[id, proctime, val, name])",
           term("partitionBy", "id"),
-          term("orderBy", "pTime"),
+          term("orderBy", "proctime"),
           term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
-          term("select", "id", "val", "name", "pTime", "SUM(val) AS w0$o0")
+          term("select", "id", "proctime", "val", "name", "SUM(val) AS w0$o0")
         ),
         term("select", "id", "name", "w0$o0 AS valSum"),
         term("where", ">(w0$o0, 100)")
@@ -163,94 +179,195 @@ class TableSourceTest extends TableTestBase {
   }
 
   @Test
-  def testProjectableProcTimeTableSource(): Unit = {
-    // ensures that projection is not pushed into table source with proctime indicators
-    val util = streamTestUtil()
+  def testProjectWithRowtimeProctime(): Unit = {
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "val", "rtime"))
 
-    val projectableTableSource = new TestProctimeSource("pTime") with ProjectableTableSource[Row] {
-      override def projectFields(fields: Array[Int]): TableSource[Row] = {
-        // ensure this method is not called!
-        Assert.fail()
-        null.asInstanceOf[TableSource[Row]]
-      }
-    }
-    util.tableEnv.registerTableSource("PTimeTable", projectableTableSource)
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
 
-    val t = util.tableEnv.scan("PTimeTable")
-      .select('name, 'val)
-      .where('val > 10)
+    val t = util.tableEnv.scan("T").select('name, 'val, 'id)
 
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        "StreamTableSourceScan(table=[[PTimeTable]], fields=[id, val, name, pTime])",
-        term("select", "name", "val"),
-        term("where", ">(val, 10)")
-      )
+    val expected = "StreamTableSourceScan(table=[[T]], " +
+      "fields=[name, val, id], " +
+      "source=[TestSource(physical fields: name, val, id)])"
     util.verifyTable(t, expected)
   }
 
   @Test
-  def testProjectableRowTimeTableSource(): Unit = {
-    // ensures that projection is not pushed into table source with rowtime indicators
+  def testProjectWithoutRowtime(): Unit = {
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "val", "rtime"))
+
     val util = streamTestUtil()
+    util.tableEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
+
+    val t = util.tableEnv.scan("T").select('ptime, 'name, 'val, 'id)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      "StreamTableSourceScan(table=[[T]], " +
+        "fields=[ptime, name, val, id], " +
+        "source=[TestSource(physical fields: name, val, id)])",
+      term("select", "PROCTIME(ptime) AS ptime", "name", "val", "id")
+    )
+    util.verifyTable(t, expected)
+  }
 
-    val projectableTableSource = new TestRowtimeSource(
-        Array("id", "rowtime", "val", "name"),
-        Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
-          .asInstanceOf[Array[TypeInformation[_]]],
-        "rowtime") with ProjectableTableSource[Row] {
+  def testProjectWithoutProctime(): Unit = {
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "rtime", "val", "name"))
 
-      override def projectFields(fields: Array[Int]): TableSource[Row] = {
-        // ensure this method is not called!
-        Assert.fail()
-        null.asInstanceOf[TableSource[Row]]
-      }
-    }
-    util.tableEnv.registerTableSource("RTimeTable", projectableTableSource)
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
 
-    val t = util.tableEnv.scan("RTimeTable")
-      .select('name, 'val)
-      .where('val > 10)
+    val t = util.tableEnv.scan("T").select('name, 'val, 'rtime, 'id)
 
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        "StreamTableSourceScan(table=[[RTimeTable]], fields=[id, rowtime, val, name])",
-        term("select", "name", "val"),
-        term("where", ">(val, 10)")
-      )
+    val expected = "StreamTableSourceScan(table=[[T]], " +
+      "fields=[name, val, rtime, id], " +
+      "source=[TestSource(physical fields: name, val, rtime, id)])"
     util.verifyTable(t, expected)
   }
-}
 
-class TestRowtimeSource(
-    fieldNames: Array[String],
-    fieldTypes: Array[TypeInformation[_]],
-    rowtimeField: String)
-  extends StreamTableSource[Row] with DefinedRowtimeAttribute {
+  def testProjectOnlyProctime(): Unit = {
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "rtime", "val", "name"))
 
-  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
 
-  override def getRowtimeAttribute: String = rowtimeField
+    val t = util.tableEnv.scan("T").select('ptime)
 
-  override def getReturnType: TypeInformation[Row] = {
-    new RowTypeInfo(fieldTypes, fieldNames)
+    val expected = "StreamTableSourceScan(table=[[T]], " +
+      "fields=[ptime], " +
+      "source=[TestSource(physical fields: )])"
+    util.verifyTable(t, expected)
   }
-}
 
-class TestProctimeSource(timeField: String)
-    extends StreamTableSource[Row] with DefinedProctimeAttribute {
+  def testProjectOnlyRowtime(): Unit = {
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "rtime", "val", "name"))
 
-  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime"))
 
-  override def getProctimeAttribute: String = timeField
+    val t = util.tableEnv.scan("T").select('rtime)
 
-  override def getReturnType: TypeInformation[Row] = {
-    new RowTypeInfo(
-      Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
-      Array("id", "val", "name"))
+    val expected = "StreamTableSourceScan(table=[[T]], " +
+      "fields=[rtime], " +
+      "source=[TestSource(physical fields: rtime)])"
+    util.verifyTable(t, expected)
   }
-}
 
+  @Test
+  def testProjectWithMapping(): Unit = {
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.LONG, Types.INT, Types.STRING, Types.LONG)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("p-rtime", "p-id", "p-name", "p-val"))
+    val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name")
+
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, Seq(), "rtime", "ptime", mapping))
+
+    val t = util.tableEnv.scan("T").select('name, 'rtime, 'val)
 
+    val expected = "StreamTableSourceScan(table=[[T]], " +
+      "fields=[name, rtime, val], " +
+      "source=[TestSource(physical fields: remapped-p-name, remapped-p-rtime, remapped-p-val)])"
+    util.verifyTable(t, expected)
+  }
+
+  @Test
+  def testNestedProject(): Unit = {
+
+    val nested1 = new RowTypeInfo(
+      Array(Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      Array("name", "value")
+    )
+
+    val nested2 = new RowTypeInfo(
+      Array(Types.INT, Types.BOOLEAN).asInstanceOf[Array[TypeInformation[_]]],
+      Array("num", "flag")
+    )
+
+    val deepNested = new RowTypeInfo(
+      Array(nested1, nested2).asInstanceOf[Array[TypeInformation[_]]],
+      Array("nested1", "nested2")
+    )
+
+    val tableSchema = new TableSchema(
+      Array("id", "deepNested", "nested", "name"),
+      Array(Types.INT, deepNested, nested1, Types.STRING))
+
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, deepNested, nested1, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+        Array("id", "deepNested", "nested", "name"))
+
+    val util = streamTestUtil()
+    util.tableEnv.registerTableSource(
+      "T",
+      new TestNestedProjectableTableSource(tableSchema, returnType, Seq()))
+
+    val t = util.tableEnv
+      .scan("T")
+      .select('id,
+        'deepNested.get("nested1").get("name") as 'nestedName,
+        'nested.get("value") as 'nestedValue,
+        'deepNested.get("nested2").get("flag") as 'nestedFlag,
+        'deepNested.get("nested2").get("num") as 'nestedNum)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      "StreamTableSourceScan(table=[[T]], " +
+        "fields=[id, deepNested, nested], " +
+        "source=[TestSource(read nested fields: " +
+          "id.*, deepNested.nested2.num, deepNested.nested2.flag, " +
+          "deepNested.nested1.name, nested.value)])",
+      term("select", "id", "deepNested.nested1.name AS nestedName", "nested.value AS nestedValue",
+        "deepNested.nested2.flag AS nestedFlag", "deepNested.nested2.num AS nestedNum")
+    )
+    util.verifyTable(t, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala
deleted file mode 100644
index 80f1725..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TableSourceValidationTest.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.stream.table.validation
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{TableException, Types}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.stream.table.{TestProctimeSource, TestRowtimeSource}
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-
-class TableSourceValidationTest extends TableTestBase {
-
-  @Test(expected = classOf[TableException])
-  def testRowtimeTableSourceWithEmptyName(): Unit = {
-
-    val tableSource = new TestRowtimeSource(
-      Array("id", "rowtime", "val", "name"),
-      Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
-        .asInstanceOf[Array[TypeInformation[_]]],
-      "rowtime"
-    )
-
-    val util = streamTestUtil()
-    util.tableEnv.registerTableSource("rowTime", tableSource)
-
-    val t = util.tableEnv.scan("rowTimeT")
-            .select('id)
-
-    util.tableEnv.optimize(t.getRelNode, updatesAsRetraction = false)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testProctimeTableSourceWithEmptyName(): Unit = {
-    val util = streamTestUtil()
-    util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource(" "))
-
-    val t = util.tableEnv.scan("procTimeT")
-            .select('id)
-
-    util.tableEnv.optimize(t.getRelNode, updatesAsRetraction = false)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
deleted file mode 100644
index a845f5c..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/FlinkTableValidationTest.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-
-class FlinkTableValidationTest extends TableTestBase {
-
-  @Test
-  def testFieldNamesDuplicate() {
-
-    thrown.expect(classOf[TableException])
-    thrown.expectMessage("Field names must be unique.\n" +
-      "List of duplicate fields: [a].\n" +
-      "List of all fields: [a, a, b].")
-
-    val util = batchTestUtil()
-    util.addTable[(Int, Int, String)]("MyTable", 'a, 'a, 'b)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
new file mode 100644
index 0000000..92cae5c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/InlineTableValidationTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class InlineTableValidationTest extends TableTestBase {
+
+  @Test
+  def testFieldNamesDuplicate() {
+
+    thrown.expect(classOf[TableException])
+    thrown.expectMessage("Field names must be unique.\n" +
+      "List of duplicate fields: [a].\n" +
+      "List of all fields: [a, a, b].")
+
+    val util = batchTestUtil()
+    util.addTable[(Int, Int, String)]("MyTable", 'a, 'a, 'b)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
index 09a9c55..4828e86 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala
@@ -18,124 +18,228 @@
 
 package org.apache.flink.table.api.validation
 
+import java.util
+import java.util.Collections
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
-import org.apache.flink.table.sources.CsvTableSource
+import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types, ValidationException}
+import org.apache.flink.table.sources._
 import org.apache.flink.table.utils.TestTableSourceWithTime
 import org.apache.flink.types.Row
 import org.junit.Test
 
 class TableSourceValidationTest {
 
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithNullPath(): Unit = {
-    CsvTableSource.builder()
-      .field("myfield", Types.STRING)
-      // should fail, path is not defined
-      .build()
-  }
+  @Test(expected = classOf[ValidationException])
+  def testUnresolvedSchemaField(): Unit = {
 
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      .field("myfield", Types.STRING)
-      // should fail, field name must no be duplicate
-      .field("myfield", Types.INT)
-  }
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
 
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      // should fail, field can be empty
-      .build()
+    val schema = new TableSchema(
+      Array("id", "name", "amount", "value"),
+      Array(Types.LONG, Types.STRING, Types.INT, Types.DOUBLE))
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "amount"))
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row]())
+
+    // should fail because schema field "value" cannot be resolved in result type
+    tEnv.registerTableSource("testTable", ts)
   }
 
-  @Test(expected = classOf[TableException])
-  def testNonExistingRowtimeField(): Unit = {
+  @Test(expected = classOf[ValidationException])
+  def testNonMatchingFieldTypes(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val schema = new TableSchema(
+      Array("id", "name", "amount"),
+      Array(Types.LONG, Types.INT, Types.INT))
     val rowType = new RowTypeInfo(
       Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
-      Array("id", "name", "amount")
-    )
-    val ts = new TestTableSourceWithTime(
-      Seq[Row](),
-      rowType,
-      "rTime",
-      null
-    )
+      Array("id", "name", "amount"))
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row]())
+
+    // should fail because types of "name" fields are different
+    tEnv.registerTableSource("testTable", ts)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMappingToUnknownField(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
-    // should fail because configured rowtime field is not in schema
+    val schema = new TableSchema(
+      Array("id", "name", "amount"),
+      Array(Types.LONG, Types.STRING, Types.DOUBLE))
+    val rowType = new RowTypeInfo(Types.LONG, Types.STRING, Types.DOUBLE)
+    val mapping = Map("id" -> "f3", "name" -> "f1", "amount" -> "f2")
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), mapping = mapping)
+
+    // should fail because mapping maps field "id" to unknown field
     tEnv.registerTableSource("testTable", ts)
   }
 
-  @Test(expected = classOf[TableException])
-  def testInvalidTypeRowtimeField(): Unit = {
-    val rowType = new RowTypeInfo(
-      Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
-      Array("id", "name", "amount")
-    )
-    val ts = new TestTableSourceWithTime(
-      Seq[Row](),
-      rowType,
-      "name",
-      null
-    )
+  @Test(expected = classOf[ValidationException])
+  def testMappingWithInvalidFieldType(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
-    // should fail because configured rowtime field is not of type Long or Timestamp
+    val schema = new TableSchema(
+      Array("id", "name", "amount"),
+      Array(Types.LONG, Types.STRING, Types.DOUBLE))
+    val rowType = new RowTypeInfo(Types.LONG, Types.STRING, Types.INT)
+    val mapping = Map("id" -> "f0", "name" -> "f1", "amount" -> "f2")
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), mapping = mapping)
+
+    // should fail because mapping maps fields with different types
     tEnv.registerTableSource("testTable", ts)
   }
 
-  @Test(expected = classOf[TableException])
-  def testEmptyRowtimeField(): Unit = {
+  @Test(expected = classOf[ValidationException])
+  def testNonTimestampProctimeField(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val schema = new TableSchema(
+      Array("id", "name", "amount", "ptime"),
+      Array(Types.LONG, Types.STRING, Types.INT, Types.LONG))
     val rowType = new RowTypeInfo(
       Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
-      Array("id", "name", "amount")
-    )
-    val ts = new TestTableSourceWithTime(
-      Seq[Row](),
-      rowType,
-      "",
-      null
-    )
+      Array("id", "name", "amount"))
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), proctime = "ptime")
+
+    // should fail because processing time field has invalid type
+    tEnv.registerTableSource("testTable", ts)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNonTimestampRowtimeField(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
-    // should fail because configured rowtime field is empty
+    val schema = new TableSchema(
+      Array("id", "name", "amount", "rtime"),
+      Array(Types.LONG, Types.STRING, Types.INT, Types.LONG))
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.LONG, Types.INT)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "rtime", "amount"))
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "rtime")
+
+    // should fail because rowtime field has invalid type
     tEnv.registerTableSource("testTable", ts)
   }
 
-  @Test(expected = classOf[TableException])
-  def testEmptyProctimeField(): Unit = {
+  @Test(expected = classOf[ValidationException])
+  def testFieldRowtimeAndProctime(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val schema = new TableSchema(
+      Array("id", "name", "amount", "time"),
+      Array(Types.LONG, Types.STRING, Types.INT, Types.SQL_TIMESTAMP))
     val rowType = new RowTypeInfo(
-      Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
-      Array("id", "name", "amount")
-    )
-    val ts = new TestTableSourceWithTime(
-      Seq[Row](),
-      rowType,
-      null,
-      ""
-    )
+      Array(Types.LONG, Types.STRING, Types.LONG, Types.INT)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "time", "amount"))
+    val ts =
+      new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "time", proctime = "time")
+
+    // should fail because rowtime field has invalid type
+    tEnv.registerTableSource("testTable", ts)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnknownTimestampExtractorArgField(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val schema = new TableSchema(
+      Array("id", "name", "amount", "rtime"),
+      Array(Types.LONG, Types.STRING, Types.INT, Types.SQL_TIMESTAMP))
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.LONG, Types.INT)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "rtime", "amount"))
+    val ts =
+      new TestTableSourceWithTime(schema, rowType, Seq[Row]()) {
+
+        override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
+          Collections.singletonList(new RowtimeAttributeDescriptor(
+            "rtime",
+            new ExistingField("doesNotExist"),
+            new AscendingWatermarks))
+        }
+    }
+
+    // should fail because timestamp extractor argument field does not exist
+    tEnv.registerTableSource("testTable", ts)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFailingTimestampExtractorValidation(): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
 
-    // should fail because configured proctime field is empty
+    val fieldNames = Array("id", "name", "amount")
+    val rowType = new RowTypeInfo(
+      Array(Types.LONG, Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      fieldNames)
+    val schema = new TableSchema(
+      fieldNames,
+      Array(Types.LONG, Types.SQL_TIMESTAMP, Types.INT))
+    val ts = new TestTableSourceWithTime(schema, rowType, Seq[Row](), rowtime = "name")
+
+    // should fail because configured rowtime field is not of type Long or Timestamp
     tEnv.registerTableSource("testTable", ts)
   }
+
+  // CsvTableSource Tests
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithNullPath(): Unit = {
+    CsvTableSource.builder()
+      .field("myfield", Types.STRING)
+      // should fail, path is not defined
+      .build()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      .field("myfield", Types.STRING)
+      // should fail, field name must no be duplicate
+      .field("myfield", Types.INT)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
+    CsvTableSource.builder()
+      .path("/path/to/csv")
+      // should fail, field can be empty
+      .build()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
index 82bfd8d..0744de9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalTableSourceUtilTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableSchema, Types}
-import org.apache.flink.table.plan.schema.StreamTableSourceTable
+import org.apache.flink.table.plan.schema.{StreamTableSourceTable}
 import org.apache.flink.table.sources.StreamTableSource
 import org.apache.flink.types.Row
 import org.junit.Assert.assertTrue
@@ -58,6 +58,9 @@ class MockTableSourceConverter extends TableSourceConverter[StreamTableSource[Ro
         val schema = externalCatalogTable.schema
         Types.ROW(schema.getColumnNames, schema.getTypes)
       }
+
+      override def getTableSchema: TableSchema = externalCatalogTable.schema
+
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
index 6d1d66f..54d1510 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/InMemoryExternalCatalogTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.catalog
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.table.api._
 import org.junit.{Before, Test}
 import org.junit.Assert._
@@ -68,7 +68,7 @@ class InMemoryExternalCatalogTest {
     val table = createTableInstance()
     catalog.createTable(tableName, table, ignoreIfExists = false)
     assertEquals(catalog.getTable(tableName), table)
-    val newTable = createTableInstance()
+    val newTable = createTableInstance(Array("number"), Array(Types.INT))
     catalog.alterTable(tableName, newTable, ignoreIfNotExists = false)
     val currentTable = catalog.getTable(tableName)
     // validate the table is really replaced after alter table
@@ -142,4 +142,11 @@ class InMemoryExternalCatalogTest {
     )
     ExternalCatalogTable("csv", schema)
   }
+
+  private def createTableInstance(
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]]): ExternalCatalogTable = {
+    val schema = new TableSchema(fieldNames, fieldTypes)
+    ExternalCatalogTable("csv", schema)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
index 5e214b1..2292e17 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
@@ -18,13 +18,22 @@
 
 package org.apache.flink.table.runtime.batch.table
 
+import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
+
+import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTimestamp}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.{ExecutionEnvironment => JExecEnv}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
 import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.{CommonTestData, TableProgramsCollectionTestBase}
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.utils.TestFilterableTableSource
+import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.table.utils._
 import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -36,6 +45,33 @@ class TableSourceITCase(
     configMode: TableConfigMode)
   extends TableProgramsCollectionTestBase(configMode) {
 
+  @Test(expected = classOf[TableException])
+  def testInvalidDatastreamType(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val tableSource = new BatchTableSource[Row]() {
+      private val fieldNames: Array[String] = Array("name", "id", "value")
+      private val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG, Types.INT)
+        .asInstanceOf[Array[TypeInformation[_]]]
+
+      override def getDataSet(execEnv: JExecEnv): DataSet[Row] = {
+        val data = List(Row.of("Mary", new JLong(1L), new JInt(1))).asJava
+        // return DataSet[Row] with GenericTypeInfo
+        execEnv.fromCollection(data, new GenericTypeInfo[Row](classOf[Row]))
+      }
+      override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames)
+      override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
+    }
+    tEnv.registerTableSource("T", tableSource)
+
+    tEnv.scan("T")
+      .select('value, 'name)
+      .collect()
+
+    // test should fail because type info of returned DataSet does not match type return type info.
+  }
+
   @Test
   def testCsvTableSourceWithProjection(): Unit = {
     val csvTable = CommonTestData.getCsvTableSource
@@ -76,4 +112,535 @@ class TableSourceITCase(
       "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8").mkString("\n")
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testRowtimeRowTableSource(): Unit = {
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of("Mary", new JLong(1L), new JInt(10)),
+      Row.of("Bob", new JLong(2L), new JInt(20)),
+      Row.of("Mary", new JLong(2L), new JInt(30)),
+      Row.of("Liz", new JLong(2001L), new JInt(40)))
+
+    val fieldNames = Array("name", "rtime", "amount")
+    val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))
+    val rowType = new RowTypeInfo(
+      Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      fieldNames)
+
+    val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", null)
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val results = tEnv.scan(tableName)
+      .window(Tumble over 1.second on 'rtime as 'w)
+      .groupBy('name, 'w)
+      .select('name, 'w.start, 'amount.sum)
+      .collect()
+
+    val expected = Seq(
+      "Mary,1970-01-01 00:00:00.0,40",
+      "Bob,1970-01-01 00:00:00.0,20",
+      "Liz,1970-01-01 00:00:02.0,40").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testProctimeRowTableSource(): Unit = {
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of("Mary", new JLong(1L), new JInt(10)),
+      Row.of("Bob", new JLong(2L), new JInt(20)),
+      Row.of("Mary", new JLong(2L), new JInt(30)),
+      Row.of("Liz", new JLong(2001L), new JInt(40)))
+
+    val fieldNames = Array("name", "rtime", "amount")
+    val schema = new TableSchema(
+      fieldNames :+ "ptime",
+      Array(Types.STRING, Types.LONG, Types.INT, Types.SQL_TIMESTAMP))
+    val rowType = new RowTypeInfo(
+      Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      fieldNames)
+
+    val tableSource = new TestTableSourceWithTime(schema, rowType, data, null, "ptime")
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val results = tEnv.scan(tableName)
+      .where('ptime.cast(Types.LONG) > 0L)
+      .select('name, 'amount)
+      .collect()
+
+    val expected = Seq(
+      "Mary,10",
+      "Bob,20",
+      "Mary,30",
+      "Liz,40").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testRowtimeProctimeRowTableSource(): Unit = {
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of("Mary", new JLong(1L), new JInt(10)),
+      Row.of("Bob", new JLong(2L), new JInt(20)),
+      Row.of("Mary", new JLong(2L), new JInt(30)),
+      Row.of("Liz", new JLong(2001L), new JInt(40)))
+
+    val fieldNames = Array("name", "rtime", "amount")
+    val schema = new TableSchema(
+      fieldNames :+ "ptime",
+      Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT, Types.SQL_TIMESTAMP))
+    val rowType = new RowTypeInfo(
+      Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      fieldNames)
+
+    val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", "ptime")
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val results = tEnv.scan(tableName)
+      .window(Tumble over 1.second on 'rtime as 'w)
+      .groupBy('name, 'w)
+      .select('name, 'w.start, 'amount.sum)
+      .collect()
+
+    val expected = Seq(
+      "Mary,1970-01-01 00:00:00.0,40",
+      "Bob,1970-01-01 00:00:00.0,20",
+      "Liz,1970-01-01 00:00:02.0,40").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testRowtimeAsTimestampRowTableSource(): Unit = {
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of("Mary", toTimestamp(1L), new JInt(10)),
+      Row.of("Bob", toTimestamp(2L), new JInt(20)),
+      Row.of("Mary", toTimestamp(2L), new JInt(30)),
+      Row.of("Liz", toTimestamp(2001L), new JInt(40)))
+
+    val fieldNames = Array("name", "rtime", "amount")
+    val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT))
+    val rowType = new RowTypeInfo(
+      Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      fieldNames)
+
+    val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", null)
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val results = tEnv.scan(tableName)
+      .window(Tumble over 1.second on 'rtime as 'w)
+      .groupBy('name, 'w)
+      .select('name, 'w.start, 'amount.sum)
+      .collect()
+
+    val expected = Seq(
+      "Mary,1970-01-01 00:00:00.0,40",
+      "Bob,1970-01-01 00:00:00.0,20",
+      "Liz,1970-01-01 00:00:02.0,40").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testRowtimeLongTableSource(): Unit = {
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(new JLong(1L), new JLong(2L), new JLong(2L), new JLong(2001L), new JLong(4001L))
+
+    val schema = new TableSchema(Array("rtime"), Array(Types.SQL_TIMESTAMP))
+    val returnType = Types.LONG
+
+    val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", null)
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val results = tEnv.scan(tableName)
+      .window(Tumble over 1.second on 'rtime as 'w)
+      .groupBy('w)
+      .select('w.start, 'rtime.count)
+      .collect()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.0,3",
+      "1970-01-01 00:00:02.0,1",
+      "1970-01-01 00:00:04.0,1").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testProctimeStringTableSource(): Unit = {
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq("Mary", "Peter", "Bob", "Liz")
+
+    val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP))
+    val returnType = Types.STRING
+
+    val tableSource = new TestTableSourceWithTime(schema, returnType, data, null, "ptime")
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val results = tEnv.scan(tableName)
+      .where('ptime.cast(Types.LONG) > 1)
+      .select('name)
+      .collect()
+
+    val expected = Seq("Mary", "Peter", "Bob", "Liz").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testRowtimeProctimeLongTableSource(): Unit = {
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(new JLong(1L), new JLong(2L), new JLong(2L), new JLong(2001L), new JLong(4001L))
+
+    val schema = new TableSchema(
+      Array("rtime", "ptime"),
+      Array(Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP))
+    val returnType = Types.LONG
+
+    val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", "ptime")
+    tEnv.registerTableSource(tableName, tableSource)
+
+    val results = tEnv.scan(tableName)
+      .where('ptime.cast(Types.LONG) > 1)
+      .window(Tumble over 1.second on 'rtime as 'w)
+      .groupBy('w)
+      .select('w.start, 'rtime.count)
+      .collect()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.0,3",
+      "1970-01-01 00:00:02.0,1",
+      "1970-01-01 00:00:04.0,1").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFieldMappingTableSource(): Unit = {
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of("Mary", new JLong(1L), new JInt(10)),
+      Row.of("Bob", new JLong(2L), new JInt(20)),
+      Row.of("Mary", new JLong(2L), new JInt(30)),
+      Row.of("Liz", new JLong(2001L), new JInt(40)))
+
+    val schema = new TableSchema(
+      Array("ptime", "amount", "name", "rtime"),
+      Array(Types.SQL_TIMESTAMP, Types.INT, Types.STRING, Types.SQL_TIMESTAMP))
+    val returnType = new RowTypeInfo(Types.STRING, Types.LONG, Types.INT)
+    val mapping = Map("amount" -> "f2", "name" -> "f0", "rtime" -> "f1")
+
+    val source = new TestTableSourceWithTime(schema, returnType, data, "rtime", "ptime", mapping)
+    tEnv.registerTableSource(tableName, source)
+
+    val results = tEnv.scan(tableName)
+      .window(Tumble over 1.second on 'rtime as 'w)
+      .groupBy('name, 'w)
+      .select('name, 'w.start, 'amount.sum)
+      .collect()
+
+    val expected = Seq(
+      "Mary,1970-01-01 00:00:00.0,40",
+      "Bob,1970-01-01 00:00:00.0,20",
+      "Liz,1970-01-01 00:00:02.0,40").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testProjectWithoutRowtimeProctime(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)),
+      Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)),
+      Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)),
+      Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001)))
+
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "val", "rtime"))
+
+    tEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
+
+    val results = tEnv.scan("T")
+      .select('name, 'val, 'id)
+      .collect()
+
+    val expected = Seq(
+      "Mary,10,1",
+      "Bob,20,2",
+      "Mike,30,3",
+      "Liz,40,4").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testProjectWithoutProctime(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)),
+      Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)),
+      Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)),
+      Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001)))
+
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "val", "rtime"))
+
+    tEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
+
+    val results = tEnv.scan("T")
+      .select('rtime, 'name, 'id)
+      .collect()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.001,Mary,1",
+      "1970-01-01 00:00:00.002,Bob,2",
+      "1970-01-01 00:00:00.002,Mike,3",
+      "1970-01-01 00:00:02.001,Liz,4").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testProjectWithoutRowtime(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)),
+      Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)),
+      Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)),
+      Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001)))
+
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.STRING, Types.LONG, Types.LONG)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "name", "val", "rtime"))
+
+    tEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
+
+    val results = tEnv.scan("T")
+      .filter('ptime.cast(Types.LONG) > 0)
+      .select('name, 'id)
+      .collect()
+
+    val expected = Seq(
+      "Mary,1",
+      "Bob,2",
+      "Mike,3",
+      "Liz,4").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  def testProjectOnlyProctime(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of(new JInt(1), new JLong(1), new JLong(10L), "Mary"),
+      Row.of(new JInt(2), new JLong(2L), new JLong(20L), "Bob"),
+      Row.of(new JInt(3), new JLong(2L), new JLong(30L), "Mike"),
+      Row.of(new JInt(4), new JLong(2001L), new JLong(30L), "Liz"))
+
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "rtime", "val", "name"))
+
+    tEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
+
+    val results = tEnv.scan("T")
+      .select('ptime > 0)
+      .select(1.count)
+      .collect()
+
+    val expected = Seq("4").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  def testProjectOnlyRowtime(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of(new JInt(1), new JLong(1), new JLong(10L), "Mary"),
+      Row.of(new JInt(2), new JLong(2L), new JLong(20L), "Bob"),
+      Row.of(new JInt(3), new JLong(2L), new JLong(30L), "Mike"),
+      Row.of(new JInt(4), new JLong(2001L), new JLong(30L), "Liz"))
+
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.INT, Types.LONG, Types.LONG, Types.STRING)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "rtime", "val", "name"))
+
+    tEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
+
+    val results = tEnv.scan("T")
+      .select('rtime)
+      .collect()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.001",
+      "1970-01-01 00:00:00.002",
+      "1970-01-01 00:00:00.002",
+      "1970-01-01 00:00:02.001").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testProjectWithMapping(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of(new JLong(1), new JInt(1), "Mary", new JLong(10)),
+      Row.of(new JLong(2), new JInt(2), "Bob", new JLong(20)),
+      Row.of(new JLong(2), new JInt(3), "Mike", new JLong(30)),
+      Row.of(new JLong(2001), new JInt(4), "Liz", new JLong(40)))
+
+    val tableSchema = new TableSchema(
+      Array("id", "rtime", "val", "ptime", "name"),
+      Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING))
+    val returnType = new RowTypeInfo(
+      Array(Types.LONG, Types.INT, Types.STRING, Types.LONG)
+        .asInstanceOf[Array[TypeInformation[_]]],
+      Array("p-rtime", "p-id", "p-name", "p-val"))
+    val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name")
+
+    tEnv.registerTableSource(
+      "T",
+      new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime", mapping))
+
+    val results = tEnv.scan("T")
+      .select('name, 'rtime, 'val)
+      .collect()
+
+    val expected = Seq(
+      "Mary,1970-01-01 00:00:00.001,10",
+      "Bob,1970-01-01 00:00:00.002,20",
+      "Mike,1970-01-01 00:00:00.002,30",
+      "Liz,1970-01-01 00:00:02.001,40").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testNestedProject(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = Seq(
+      Row.of(new JLong(1),
+        Row.of(
+          Row.of("Sarah", new JInt(100)),
+          Row.of(new JInt(1000), new JBool(true))
+        ),
+        Row.of("Peter", new JInt(10000)),
+        "Mary"),
+      Row.of(new JLong(2),
+        Row.of(
+          Row.of("Rob", new JInt(200)),
+          Row.of(new JInt(2000), new JBool(false))
+        ),
+        Row.of("Lucy", new JInt(20000)),
+        "Bob"),
+      Row.of(new JLong(3),
+        Row.of(
+          Row.of("Mike", new JInt(300)),
+          Row.of(new JInt(3000), new JBool(true))
+        ),
+        Row.of("Betty", new JInt(30000)),
+        "Liz"))
+
+    val nested1 = new RowTypeInfo(
+      Array(Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]],
+      Array("name", "value")
+    )
+    val nested2 = new RowTypeInfo(
+      Array(Types.INT, Types.BOOLEAN).asInstanceOf[Array[TypeInformation[_]]],
+      Array("num", "flag")
+    )
+    val deepNested = new RowTypeInfo(
+      Array(nested1, nested2).asInstanceOf[Array[TypeInformation[_]]],
+      Array("nested1", "nested2")
+    )
+    val tableSchema = new TableSchema(
+      Array("id", "deepNested", "nested", "name"),
+      Array(Types.LONG, deepNested, nested1, Types.STRING))
+
+    val returnType = new RowTypeInfo(
+      Array(Types.LONG, deepNested, nested1, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+      Array("id", "deepNested", "nested", "name"))
+
+    tEnv.registerTableSource(
+      "T",
+      new TestNestedProjectableTableSource(tableSchema, returnType, data))
+
+    val results = tEnv
+      .scan("T")
+      .select('id,
+        'deepNested.get("nested1").get("name") as 'nestedName,
+        'nested.get("value") as 'nestedValue,
+        'deepNested.get("nested2").get("flag") as 'nestedFlag,
+        'deepNested.get("nested2").get("num") as 'nestedNum)
+      .collect()
+
+    val expected = Seq(
+      "1,Sarah,10000,true,1000",
+      "2,Rob,20000,false,2000",
+      "3,Mike,30000,true,3000").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index e672335..b7f97f9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.runtime.utils.JavaPojos.Pojo1
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.plan.TimeIndicatorConversionTest.TableFunc
-import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.api.{TableEnvironment, TableSchema, Types}
 import org.apache.flink.table.expressions.{ExpressionParser, TimeIntervalUnit}
 import org.apache.flink.table.runtime.stream.TimeAttributesITCase.{TestPojo, TimestampWithEqualWatermark, TimestampWithEqualWatermarkPojo}
 import org.apache.flink.table.runtime.utils.StreamITCase
@@ -494,6 +494,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
   @Test
   def testTableSourceWithTimeIndicators(): Unit = {
 
+    StreamITCase.clear
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -505,15 +506,17 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
       Row.of(new JInt(4), "D", new JLong(4000L)),
       Row.of(new JInt(5), "E", new JLong(5000L)),
       Row.of(new JInt(6), "F", new JLong(6000L)))
+
+    val fieldNames = Array("a", "b", "rowtime")
+    val schema = new TableSchema(
+      fieldNames :+ "proctime",
+      Array(Types.INT, Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP))
     val rowType = new RowTypeInfo(
       Array(Types.INT, Types.STRING, Types.LONG).asInstanceOf[Array[TypeInformation[_]]],
-      Array("a", "b", "rowtime")
-    )
+      fieldNames)
 
-    tEnv.registerTableSource(
-      "testTable",
-      new TestTableSourceWithTime(rows, rowType, "rowtime", "proctime"))
-    StreamITCase.clear
+    val tableSource = new TestTableSourceWithTime(schema, rowType, rows, "rowtime", "proctime")
+    tEnv.registerTableSource("testTable", tableSource)
 
     val result = tEnv
       .scan("testTable")