You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/05 17:39:16 UTC

flink git commit: [FLINK-8203] [FLINK-7681] [table] Make schema definition of DataStream/DataSet to Table conversion more flexible

Repository: flink
Updated Branches:
  refs/heads/master f88da4d04 -> fb29898cd


[FLINK-8203] [FLINK-7681] [table] Make schema definition of DataStream/DataSet to Table conversion more flexible

This closes #5132.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb29898c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb29898c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb29898c

Branch: refs/heads/master
Commit: fb29898cd3507b2b94dd8bbf3dbfd2132b643a1d
Parents: f88da4d
Author: twalthr <tw...@apache.org>
Authored: Thu Dec 7 11:52:28 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Fri Jan 5 18:32:37 2018 +0100

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |   4 +-
 .../table/api/StreamTableEnvironment.scala      |  96 ++-
 .../flink/table/api/TableEnvironment.scala      | 163 +++--
 .../flink/table/codegen/CodeGenerator.scala     |  19 +-
 .../plan/schema/FlinkTableFunctionImpl.scala    |  16 +-
 .../flink/table/plan/schema/InlineTable.scala   |  16 +-
 .../table/runtime/aggregate/SortUtil.scala      |   3 +-
 .../flink/table/sources/TableSourceUtil.scala   |  15 +-
 .../batch/table/JavaTableEnvironmentITCase.java |  74 +-
 .../flink/table/api/TableEnvironmentTest.scala  | 687 ++++++++++++++-----
 .../StreamTableEnvironmentValidationTest.scala  | 102 ++-
 .../TableEnvironmentValidationTest.scala        |  84 +--
 .../batch/table/TableEnvironmentITCase.scala    |  22 +-
 .../flink/table/utils/TableTestBase.scala       |   8 +-
 14 files changed, 923 insertions(+), 386 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index c25007b..c920d23 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -290,8 +290,10 @@ abstract class BatchTableEnvironment(
   protected def registerDataSetInternal[T](
       name: String, dataSet: DataSet[T], fields: Array[Expression]): Unit = {
 
+    val inputType = dataSet.getType
+
     val (fieldNames, fieldIndexes) = getFieldInfo[T](
-      dataSet.getType,
+      inputType,
       fields)
 
     if (fields.exists(_.isInstanceOf[TimeAttribute])) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 920da2e..9d94f54 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -28,10 +28,10 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
@@ -42,9 +42,9 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable, TableSinkTable}
 import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.runtime.conversion._
-import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable, TableSinkTable}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFunction}
 import org.apache.flink.table.sinks._
@@ -450,37 +450,57 @@ abstract class StreamTableEnvironment(
     exprs: Array[Expression])
   : (Option[(Int, String)], Option[(Int, String)]) = {
 
-    val fieldTypes: Array[TypeInformation[_]] = streamType match {
-      case c: CompositeType[_] => (0 until c.getArity).map(i => c.getTypeAt(i)).toArray
-      case a: AtomicType[_] => Array(a)
+    val (isRefByPos, fieldTypes) = streamType match {
+      case c: CompositeType[_] =>
+        // determine schema definition mode (by position or by name)
+        (isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => c.getTypeAt(i)).toArray)
+      case t: TypeInformation[_] =>
+        (false, Array(t))
     }
 
     var fieldNames: List[String] = Nil
     var rowtime: Option[(Int, String)] = None
     var proctime: Option[(Int, String)] = None
 
+    def checkRowtimeType(t: TypeInformation[_]): Unit = {
+      if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
+        throw new TableException(
+          s"The rowtime attribute can only replace a field with a valid time type, " +
+          s"such as Timestamp or Long. But was: $t")
+      }
+    }
+
     def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = {
       if (rowtime.isDefined) {
         throw new TableException(
           "The rowtime attribute can only be defined once in a table schema.")
       } else {
-        val mappedIdx = streamType match {
-          case pti: PojoTypeInfo[_] =>
-            pti.getFieldIndex(origName.getOrElse(name))
-          case _ => idx;
+        // if the fields are referenced by position,
+        // it is possible to replace an existing field or append the time attribute at the end
+        if (isRefByPos) {
+          // aliases are not permitted
+          if (origName.isDefined) {
+            throw new TableException(
+              s"Invalid alias '${origName.get}' because fields are referenced by position.")
+          }
+          // check type of field that is replaced
+          if (idx < fieldTypes.length) {
+            checkRowtimeType(fieldTypes(idx))
+          }
         }
-        // check type of field that is replaced
-        if (mappedIdx < 0) {
-          throw new TableException(
-            s"The rowtime attribute can only replace a valid field. " +
-              s"${origName.getOrElse(name)} is not a field of type $streamType.")
-        }
-        else if (mappedIdx < fieldTypes.length &&
-          !(TypeCheckUtils.isLong(fieldTypes(mappedIdx)) ||
-            TypeCheckUtils.isTimePoint(fieldTypes(mappedIdx)))) {
-          throw new TableException(
-            s"The rowtime attribute can only replace a field with a valid time type, " +
-              s"such as Timestamp or Long. But was: ${fieldTypes(mappedIdx)}")
+        // check reference-by-name
+        else {
+          val aliasOrName = origName.getOrElse(name)
+          streamType match {
+            // both alias and reference must have a valid type if they replace a field
+            case ct: CompositeType[_] if ct.hasField(aliasOrName) =>
+              val t = ct.getTypeAt(ct.getFieldIndex(aliasOrName))
+              checkRowtimeType(t)
+            // alias could not be found
+            case _ if origName.isDefined =>
+              throw new TableException(s"Alias '${origName.get}' must reference an existing field.")
+            case _ => // ok
+          }
         }
 
         rowtime = Some(idx, name)
@@ -492,11 +512,26 @@ abstract class StreamTableEnvironment(
           throw new TableException(
             "The proctime attribute can only be defined once in a table schema.")
       } else {
-        // check that proctime is only appended
-        if (idx < fieldTypes.length) {
-          throw new TableException(
-            "The proctime attribute can only be appended to the table schema and not replace " +
-              "an existing field. Please move it to the end of the schema.")
+        // if the fields are referenced by position,
+        // it is only possible to append the time attribute at the end
+        if (isRefByPos) {
+
+          // check that proctime is only appended
+          if (idx < fieldTypes.length) {
+            throw new TableException(
+              "The proctime attribute can only be appended to the table schema and not replace " +
+                s"an existing field. Please move '$name' to the end of the schema.")
+          }
+        }
+        // check reference-by-name
+        else {
+          streamType match {
+            // proctime attribute must not replace a field
+            case ct: CompositeType[_] if ct.hasField(name) =>
+              throw new TableException(
+                s"The proctime attribute '$name' must not replace an existing field.")
+            case _ => // ok
+          }
         }
         proctime = Some(idx, name)
       }
@@ -512,16 +547,15 @@ abstract class StreamTableEnvironment(
       case (ProctimeAttribute(UnresolvedFieldReference(name)), idx) =>
         extractProctime(idx, name)
 
-      case (ProctimeAttribute(Alias(UnresolvedFieldReference(_), name, _)), idx) =>
-        extractProctime(idx, name)
-
       case (UnresolvedFieldReference(name), _) => fieldNames = name :: fieldNames
 
       case (Alias(UnresolvedFieldReference(_), name, _), _) => fieldNames = name :: fieldNames
 
       case (e, _) =>
         throw new TableException(s"Time attributes can only be defined on field references or " +
-          s"aliases of field references. But was: $e")
+          s"aliases of valid field references. Rowtime attributes can replace existing fields, " +
+          s"proctime attributes can not. " +
+          s"But was: $e")
     }
 
     if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index c3cab13..6170fa1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -36,7 +36,7 @@ import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
 import org.apache.calcite.tools._
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
 import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
@@ -769,6 +769,38 @@ abstract class TableEnvironment(val config: TableConfig) {
   }
 
   /**
+    * Reference input fields by name:
+    * All fields in the schema definition are referenced by name
+    * (and possibly renamed using an alias (as). In this mode, fields can be reordered and
+    * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
+    * positions using arbitrary names (except those that exist in the result schema). This mode
+    * can be used for any input type, including POJOs.
+    *
+    * Reference input fields by position:
+    * Field references must refer to existing fields in the input type (except for
+    * renaming with alias (as)). In this mode, fields are simply renamed. Event-time attributes can
+    * replace the field on their position in the input data (if it is of correct type) or be
+    * appended at the end. Proctime attributes must be appended at the end. This mode can only be
+    * used if the input type has a defined field order (tuple, case class, Row) and no of fields
+    * references a field of the input type.
+    */
+  protected def isReferenceByPosition(ct: CompositeType[_], fields: Array[Expression]): Boolean = {
+    if (!ct.isInstanceOf[TupleTypeInfoBase[_]]) {
+      return false
+    }
+
+    val inputNames = ct.getFieldNames
+
+    // Use the by-position mode if no of the fields exists in the input.
+    // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
+    // by position but the user might assume reordering instead of renaming.
+    fields.forall {
+      case UnresolvedFieldReference(name) => !inputNames.contains(name)
+      case _ => true
+    }
+  }
+
+  /**
     * Returns field names and field positions for a given [[TypeInformation]].
     *
     * @param inputType The TypeInformation extract the field names and positions from.
@@ -796,97 +828,83 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @tparam A The type of the TypeInformation.
     * @return A tuple of two arrays holding the field names and corresponding field positions.
     */
-  protected[flink] def getFieldInfo[A](
+  protected def getFieldInfo[A](
       inputType: TypeInformation[A],
       exprs: Array[Expression])
     : (Array[String], Array[Int]) = {
 
     TableEnvironment.validateType(inputType)
 
+    def referenceByName(name: String, ct: CompositeType[_]): Option[Int] = {
+      val inputIdx = ct.getFieldIndex(name)
+      if (inputIdx < 0) {
+        throw new TableException(s"$name is not a field of type $ct. " +
+                s"Expected: ${ct.getFieldNames.mkString(", ")}")
+      } else {
+        Some(inputIdx)
+      }
+    }
+
     val indexedNames: Array[(Int, String)] = inputType match {
+
       case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
         throw new TableException(
           "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
             "Please specify the type of the input with a RowTypeInfo.")
-      case a: AtomicType[_] =>
-        exprs.zipWithIndex flatMap {
-          case (_: TimeAttribute, _) =>
-            None
-          case (UnresolvedFieldReference(name), idx) if idx > 0 =>
-            // only accept the first field for an atomic type
-            throw new TableException("Only the first field can reference an atomic type.")
-          case (UnresolvedFieldReference(name), idx) =>
-            // first field reference is mapped to atomic type
-            Some((0, name))
-          case _ => throw new TableException("Field reference expression requested.")
-        }
-      case t: TupleTypeInfo[A] =>
+
+      case t: TupleTypeInfoBase[A] if t.isInstanceOf[TupleTypeInfo[A]] ||
+        t.isInstanceOf[CaseClassTypeInfo[A]] || t.isInstanceOf[RowTypeInfo] =>
+
+        // determine schema definition mode (by position or by name)
+        val isRefByPos = isReferenceByPosition(t, exprs)
+
         exprs.zipWithIndex flatMap {
-          case (UnresolvedFieldReference(name), idx) =>
-            Some((idx, name))
-          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
-            val idx = t.getFieldIndex(origName)
-            if (idx < 0) {
-              throw new TableException(s"$origName is not a field of type $t")
+          case (UnresolvedFieldReference(name: String), idx) =>
+            if (isRefByPos) {
+              Some((idx, name))
+            } else {
+              referenceByName(name, t).map((_, name))
             }
-            Some((idx, name))
-          case (_: TimeAttribute, _) =>
-            None
-          case _ => throw new TableException(
-            "Field reference expression or alias on field expression expected.")
-        }
-      case c: CaseClassTypeInfo[A] =>
-        exprs.zipWithIndex flatMap {
-          case (UnresolvedFieldReference(name), idx) =>
-            Some((idx, name))
-          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
-            val idx = c.getFieldIndex(origName)
-            if (idx < 0) {
-              throw new TableException(s"$origName is not a field of type $c")
+          case (Alias(UnresolvedFieldReference(origName), name: String, _), _) =>
+            if (isRefByPos) {
+              throw new TableException(
+                s"Alias '$name' is not allowed if other fields are referenced by position.")
+            } else {
+              referenceByName(origName, t).map((_, name))
             }
-            Some((idx, name))
           case (_: TimeAttribute, _) =>
             None
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
         }
+
       case p: PojoTypeInfo[A] =>
         exprs flatMap {
-          case (UnresolvedFieldReference(name)) =>
-            val idx = p.getFieldIndex(name)
-            if (idx < 0) {
-              throw new TableException(s"$name is not a field of type $p")
-            }
-            Some((idx, name))
-          case Alias(UnresolvedFieldReference(origName), name, _) =>
-            val idx = p.getFieldIndex(origName)
-            if (idx < 0) {
-              throw new TableException(s"$origName is not a field of type $p")
-            }
-            Some((idx, name))
+          case (UnresolvedFieldReference(name: String)) =>
+            referenceByName(name, p).map((_, name))
+          case Alias(UnresolvedFieldReference(origName), name: String, _) =>
+            referenceByName(origName, p).map((_, name))
           case _: TimeAttribute =>
             None
           case _ => throw new TableException(
             "Field reference expression or alias on field expression expected.")
         }
-      case r: RowTypeInfo =>
-        exprs.zipWithIndex flatMap {
-          case (UnresolvedFieldReference(name), idx) =>
-            Some((idx, name))
-          case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
-            val idx = r.getFieldIndex(origName)
-            if (idx < 0) {
-              throw new TableException(s"$origName is not a field of type $r")
-            }
-            Some((idx, name))
-          case (_: TimeAttribute, _) =>
+
+      case _: TypeInformation[_] => // atomic or other custom type information
+        var referenced = false
+        exprs flatMap {
+          case _: TimeAttribute =>
             None
+          case UnresolvedFieldReference(_) if referenced =>
+            // only accept the first field for an atomic type
+            throw new TableException("Only the first field can reference an atomic type.")
+          case UnresolvedFieldReference(name: String) =>
+            referenced = true
+            // first field reference is mapped to atomic type
+            Some((0, name))
           case _ => throw new TableException(
-            "Field reference expression or alias on field expression expected.")
+            "Field reference expression expected.")
         }
-
-      case tpe => throw new TableException(
-        s"Source of type $tpe cannot be converted into Table.")
     }
 
     val (fieldIndexes, fieldNames) = indexedNames.unzip
@@ -967,17 +985,17 @@ abstract class TableEnvironment(val config: TableConfig) {
             }
         }
 
-      // Atomic type requested
-      case at: AtomicType[_] =>
+      // atomic type requested
+      case t: TypeInformation[_] =>
         if (fieldTypes.size != 1) {
           throw new TableException(s"Requested result type is an atomic type but " +
             s"result[$fieldTypes] has more or less than a single field.")
         }
         val requestedTypeInfo = fieldTypes.head
         validateFieldType(requestedTypeInfo)
-        if (requestedTypeInfo != at) {
+        if (requestedTypeInfo != t) {
           throw new TableException(s"Result field does not match requested type. " +
-            s"Requested: $at; Actual: $requestedTypeInfo")
+            s"Requested: $t; Actual: $requestedTypeInfo")
         }
 
       case _ =>
@@ -1119,10 +1137,7 @@ object TableEnvironment {
 
     val fieldNames: Array[String] = inputType match {
       case t: CompositeType[_] => t.getFieldNames
-      case a: AtomicType[_] => Array("f0")
-      case tpe =>
-        throw new TableException(s"Currently only CompositeType and AtomicType are supported. " +
-          s"Type $tpe lacks explicit field naming")
+      case _: TypeInformation[_] => Array("f0")
     }
 
     if (fieldNames.contains("*")) {
@@ -1167,10 +1182,8 @@ object TableEnvironment {
     validateType(inputType)
 
     inputType match {
-      case t: CompositeType[_] => 0.until(t.getArity).map(i => t.getTypeAt(i)).toArray
-      case a: AtomicType[_] => Array(a.asInstanceOf[TypeInformation[_]])
-      case tpe =>
-        throw new TableException(s"Currently only CompositeType and AtomicType are supported.")
+      case ct: CompositeType[_] => 0.until(ct.getArity).map(i => ct.getTypeAt(i)).toArray
+      case t: TypeInformation[_] => Array(t.asInstanceOf[TypeInformation[_]])
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 40ea5b2..df7ef57 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -370,10 +370,10 @@ abstract class CodeGenerator(
           case _ => // ok
         }
 
-      case at: AtomicType[_] if at != fieldExprs.head.resultType =>
+      case t: TypeInformation[_] if t != fieldExprs.head.resultType =>
         throw new CodeGenException(
           s"Incompatible types of expression and result type. Expression [${fieldExprs.head}] " +
-          s"type is [${fieldExprs.head.resultType}], result type is [$at]")
+          s"type is [${fieldExprs.head.resultType}], result type is [$t]")
 
       case _ => // ok
     }
@@ -531,7 +531,7 @@ abstract class CodeGenerator(
 
         GeneratedExpression(resultTerm, "false", resultCode, returnType)
 
-      case a: AtomicType[_] =>
+      case t: TypeInformation[_] =>
         val fieldExpr = boxedFieldExprs.head
         val nullCheckCode = if (nullCheck) {
           s"""
@@ -1079,8 +1079,7 @@ abstract class CodeGenerator(
 
     val fieldType = inputType match {
       case ct: CompositeType[_] => ct.getTypeAt(index)
-      case at: AtomicType[_] => at
-      case _ => throw new CodeGenException("Unsupported type for input field access.")
+      case t: TypeInformation[_] => t
     }
     val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldType)
     val defaultValue = primitiveDefaultValue(fieldType)
@@ -1110,6 +1109,7 @@ abstract class CodeGenerator(
       index: Int)
     : GeneratedExpression = {
     inputType match {
+
       case ct: CompositeType[_] =>
         val accessor = fieldAccessorFor(ct, index)
         val fieldType: TypeInformation[Any] = ct.getTypeAt(index)
@@ -1156,13 +1156,10 @@ abstract class CodeGenerator(
             }
         }
 
-      case at: AtomicType[_] =>
-        val fieldTypeTerm = boxedTypeTermForTypeInfo(at)
+      case t: TypeInformation[_] =>
+        val fieldTypeTerm = boxedTypeTermForTypeInfo(t)
         val inputCode = s"($fieldTypeTerm) $inputTerm"
-        generateInputFieldUnboxing(at, inputCode)
-
-      case _ =>
-        throw new CodeGenException("Unsupported type for input field access.")
+        generateInputFieldUnboxing(t, inputCode)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
index cab8ea9..df6fd29 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
@@ -23,7 +23,7 @@ import java.util.Collections
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.schema.{FunctionParameter, TableFunction}
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -52,19 +52,21 @@ class FlinkTableFunctionImpl[T](
 
   val fieldTypes: Array[TypeInformation[_]] =
     typeInfo match {
-      case cType: CompositeType[T] =>
-        if (fieldNames.length != cType.getArity) {
+
+      case ct: CompositeType[T] =>
+        if (fieldNames.length != ct.getArity) {
           throw new TableException(
-            s"Arity of type (" + cType.getFieldNames.deep + ") " +
+            s"Arity of type (" + ct.getFieldNames.deep + ") " +
               "not equal to number of field names " + fieldNames.deep + ".")
         }
-        fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
-      case aType: AtomicType[T] =>
+        fieldIndexes.map(ct.getTypeAt(_).asInstanceOf[TypeInformation[_]])
+
+      case t: TypeInformation[T] =>
         if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
           throw new TableException(
             "Non-composite input type may have only a single field and its index must be 0.")
         }
-        Array(aType)
+        Array(t)
     }
 
   override def getElementType(arguments: util.List[AnyRef]): Type = classOf[Array[Object]]

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
index 22d6151..7e61fdf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/InlineTable.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.schema
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.schema.Statistic
 import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{TableException, Types}
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -59,11 +59,12 @@ abstract class InlineTable[T](
 
   val fieldTypes: Array[TypeInformation[_]] =
     typeInfo match {
-      case cType: CompositeType[_] =>
+
+      case ct: CompositeType[_] =>
         // it is ok to leave out fields
-        if (fieldIndexes.count(_ >= 0) > cType.getArity) {
+        if (fieldIndexes.count(_ >= 0) > ct.getArity) {
           throw new TableException(
-          s"Arity of type (" + cType.getFieldNames.deep + ") " +
+          s"Arity of type (" + ct.getFieldNames.deep + ") " +
             "must not be greater than number of field names " + fieldNames.deep + ".")
         }
         fieldIndexes.map {
@@ -75,8 +76,9 @@ abstract class InlineTable[T](
             Types.SQL_TIMESTAMP
           case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER =>
             Types.SQL_TIMESTAMP
-          case i => cType.getTypeAt(i).asInstanceOf[TypeInformation[_]]}
-      case aType: AtomicType[_] =>
+          case i => ct.getTypeAt(i).asInstanceOf[TypeInformation[_]]}
+
+      case t: TypeInformation[_] =>
         var cnt = 0
         val types = fieldIndexes.map {
           case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER =>
@@ -89,7 +91,7 @@ abstract class InlineTable[T](
             Types.SQL_TIMESTAMP
           case _ =>
             cnt += 1
-            aType.asInstanceOf[TypeInformation[_]]
+            t.asInstanceOf[TypeInformation[_]]
         }
         // ensure that the atomic type is matched at most once.
         if (cnt > 1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
index d62c7b9..2bafed9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
@@ -135,7 +135,8 @@ object SortUtil {
 
     val fieldComps = for ((k, o) <- sortFields.zip(sortDirections)) yield {
       FlinkTypeFactory.toTypeInfo(inputType.getFieldList.get(k).getType) match {
-        case a: AtomicType[AnyRef] => a.createComparator(o, execConfig)
+        case a: AtomicType[_] =>
+          a.createComparator(o, execConfig).asInstanceOf[TypeComparator[AnyRef]]
         case x: TypeInformation[_] =>  
           throw new TableException(s"Unsupported field type $x to sort on.")
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index 48ab3de..6895419 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.logical.LogicalValues
 import org.apache.calcite.rex.{RexLiteral, RexNode}
 import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{TableException, Types, ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -95,7 +95,7 @@ object TableSourceUtil {
         mappedFieldCnt += 1
     }
     // ensure that only one field is mapped to an atomic type
-    if (tableSource.getReturnType.isInstanceOf[AtomicType[_]] && mappedFieldCnt > 1) {
+    if (!tableSource.getReturnType.isInstanceOf[CompositeType[_]] && mappedFieldCnt > 1) {
       throw ValidationException(
         s"More than one table field matched to atomic input type ${tableSource.getReturnType}.")
     }
@@ -231,7 +231,7 @@ object TableSourceUtil {
     }
 
     // ensure that only one field is mapped to an atomic type
-    if (inputType.isInstanceOf[AtomicType[_]] && mapping.count(_ >= 0) > 1) {
+    if (!inputType.isInstanceOf[CompositeType[_]] && mapping.count(_ >= 0) > 1) {
       throw ValidationException(
         s"More than one table field matched to atomic input type $inputType.")
     }
@@ -466,9 +466,7 @@ object TableSourceUtil {
     /** Look up a field by name in a type information */
     def lookupField(fieldName: String, failMsg: String): (String, Int, TypeInformation[_]) = {
       returnType match {
-        case a: AtomicType[_] =>
-          // no composite type, we return the full atomic type as field
-          (fieldName, 0, a)
+
         case c: CompositeType[_] =>
           // get and check field index
           val idx = c.getFieldIndex(fieldName)
@@ -477,7 +475,10 @@ object TableSourceUtil {
           }
           // return field name, index, and field type
           (fieldName, idx, c.getTypeAt(idx))
-        case _ => throw TableException("Unexpected type information.")
+
+        case t: TypeInformation[_] =>
+          // no composite type, we return the full atomic type as field
+          (fieldName, 0, t)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
index be7879a..4dc58b2 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.runtime.batch.table;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -36,6 +37,7 @@ import org.apache.flink.table.calcite.CalciteConfigBuilder;
 import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase;
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase;
 import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.types.Either;
 import org.apache.flink.types.Row;
 
 import org.apache.calcite.tools.RuleSets;
@@ -177,7 +179,7 @@ public class JavaTableEnvironmentITCase extends TableProgramsCollectionTestBase
 	}
 
 	@Test
-	public void testAsFromTuple() throws Exception {
+	public void testAsFromTupleByPosition() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
 
@@ -198,6 +200,25 @@ public class JavaTableEnvironmentITCase extends TableProgramsCollectionTestBase
 	}
 
 	@Test
+	public void testAsFromTupleByName() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "f2");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hi\n" + "Hello\n" + "Hello world\n" +
+			"Hello world, how are you?\n" + "I am fine.\n" + "Luke Skywalker\n" +
+			"Comment#1\n" + "Comment#2\n" + "Comment#3\n" + "Comment#4\n" +
+			"Comment#5\n" + "Comment#6\n" + "Comment#7\n" +
+			"Comment#8\n" + "Comment#9\n" + "Comment#10\n" +
+			"Comment#11\n" + "Comment#12\n" + "Comment#13\n" +
+			"Comment#14\n" + "Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
 	public void testAsFromAndToTuple() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
@@ -273,6 +294,57 @@ public class JavaTableEnvironmentITCase extends TableProgramsCollectionTestBase
 	}
 
 	@Test
+	public void testFromNonAtomicAndNonComposite() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<Either<String, Integer>> data = new ArrayList<>();
+		data.add(new Either.Left<>("Hello"));
+		data.add(new Either.Right<>(42));
+		data.add(new Either.Left<>("World"));
+
+		Table table = tableEnv
+			.fromDataSet(
+				env.fromCollection(
+					data,
+					TypeInformation.of(new TypeHint<Either<String, Integer>>() { })
+				),
+				"either")
+			.select("either");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected =
+			"Left(Hello)\n" +
+			"Left(World)\n" +
+			"Right(42)\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromPojoProjected() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<SmallPojo> data = new ArrayList<>();
+		data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new Integer[] {42}));
+		data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new Integer[] {}));
+		data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] {1, 2, 3}));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data), "name AS d")
+			.select("d");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected =
+			"Peter\n" +
+			"Anna\n" +
+			"Lucy\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
 	public void testAsFromPrivateFieldsPojo() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index ad84a03..1c097d3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -18,231 +18,556 @@
 
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
 import org.apache.flink.api.scala._
-import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
-import org.apache.flink.table.runtime.types.CRowTypeInfo
-import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase}
+import org.apache.flink.table.api.TableEnvironmentTest._
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{PROCTIME_INDICATOR => PROCTIME}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo.{ROWTIME_INDICATOR => ROWTIME}
+import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
+import org.apache.flink.api.java.tuple.{Tuple3 => JTuple3}
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.api.scala.typeutils.UnitTypeInfo
 import org.junit.Test
 
 class TableEnvironmentTest extends TableTestBase {
 
-  val tEnv = new MockTableEnvironment
-
-  val tupleType = new TupleTypeInfo(
-    INT_TYPE_INFO,
-    STRING_TYPE_INFO,
-    DOUBLE_TYPE_INFO)
-
-  val rowType = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
-
-  val cRowType = new CRowTypeInfo(rowType)
-
-  val caseClassType: TypeInformation[CClass] = implicitly[TypeInformation[CClass]]
-
-  val pojoType: TypeInformation[PojoClass] = TypeExtractor.createTypeInfo(classOf[PojoClass])
-
-  val atomicType = INT_TYPE_INFO
-
-  val genericRowType = new GenericTypeInfo[Row](classOf[Row])
+  // ----------------------------------------------------------------------------------------------
+  // schema definition by position
+  // ----------------------------------------------------------------------------------------------
 
   @Test
-  def testGetFieldInfoRow(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(rowType)
-
-    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  def testProjectByPosition(): Unit = {
+    val utils = Seq(streamTestUtil(), batchTestUtil())
+
+    utils.foreach { util =>
+
+      // case class
+      util.verifySchema(
+        util.addTable[CClass]('a, 'b, 'c),
+        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+      util.verifySchema(
+        util.addTable[CClass]('a, 'b),
+        Seq("a" -> INT, "b" -> STRING))
+
+      util.verifySchema(
+        util.addTable[CClass]('a),
+        Seq("a" -> INT))
+
+      // row
+      util.verifySchema(
+        util.addTable('a, 'b, 'c)(TEST_ROW),
+        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+      util.verifySchema(
+        util.addTable('a, 'b)(TEST_ROW),
+        Seq("a" -> INT, "b" -> STRING))
+
+      util.verifySchema(
+        util.addTable('a)(TEST_ROW),
+        Seq("a" -> INT))
+
+      // tuple
+      util.verifySchema(
+        util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c),
+        Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE))
+
+      util.verifySchema(
+        util.addTable[JTuple3[Int, String, Double]]('a, 'b),
+        Seq("a" -> INT, "b" -> STRING))
+
+      util.verifySchema(
+        util.addTable[JTuple3[Int, String, Double]]('a),
+        Seq("a" -> INT))
+    }
   }
 
   @Test
-  def testGetFieldInfoRowNames(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      rowType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  def testStreamProjectWithAddingTimeAttributesByPosition(): Unit = {
+    val util = streamTestUtil()
+
+    // case class
+    util.verifySchema(
+      util.addTable[CClass]('a, 'b, 'c , 'proctime.proctime),
+      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> PROCTIME))
+
+    util.verifySchema(
+      util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime),
+      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+    util.verifySchema(
+      util.addTable[CClass]('a, 'b, 'c, 'rowtime.rowtime, 'proctime.proctime),
+      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, "proctime" -> PROCTIME))
+
+    // row
+    util.verifySchema(
+      util.addTable('a, 'b, 'c, 'proctime.proctime)(TEST_ROW),
+      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> PROCTIME))
+
+    util.verifySchema(
+      util.addTable('a, 'b, 'c, 'rowtime.rowtime)(TEST_ROW),
+      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+    util.verifySchema(
+      util.addTable('a, 'b, 'c, 'rowtime.rowtime, 'proctime.proctime)(TEST_ROW),
+      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, "proctime" -> PROCTIME))
+
+    // tuple
+    util.verifySchema(
+      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 'proctime.proctime),
+      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "proctime" -> PROCTIME))
+
+    util.verifySchema(
+      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 'rowtime.rowtime),
+      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME))
+
+    util.verifySchema(
+      util.addTable[JTuple3[Int, String, Double]]('a, 'b, 'c, 'rowtime.rowtime, 'proctime.proctime),
+      Seq("a" -> INT, "b" -> STRING, "c" -> DOUBLE, "rowtime" -> ROWTIME, "proctime" -> PROCTIME))
   }
 
   @Test
-  def testGetFieldInfoTuple(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(tupleType)
-
-    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  def testStreamAliasWithReplacingTimeAttributesByPosition(): Unit = {
+    val util = streamTestUtil()
+
+    // case class
+    util.verifySchema(
+      util.addTable[CClassWithTime]('a, 'b.rowtime, 'c),
+      Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+    util.verifySchema(
+      util.addTable[CClassWithTime]('a, 'b.rowtime, 'c),
+      Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+    // row
+    util.verifySchema(
+      util.addTable('a, 'b.rowtime, 'c)(TEST_ROW_WITH_TIME),
+      Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+    util.verifySchema(
+      util.addTable('a, 'b.rowtime, 'c)(TEST_ROW_WITH_TIME),
+      Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+    // tuple
+    util.verifySchema(
+      util.addTable[JTuple3[Int, Long, String]]('a, 'b.rowtime, 'c),
+      Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
+
+    util.verifySchema(
+      util.addTable[JTuple3[Int, Long, String]]('a, 'b.rowtime, 'c),
+      Seq("a" -> INT, "b" -> ROWTIME, "c" -> STRING))
   }
 
-  @Test
-  def testGetFieldInfoCClass(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(caseClassType)
+  // ----------------------------------------------------------------------------------------------
+  // schema definition by name
+  // ----------------------------------------------------------------------------------------------
 
-    fieldInfo._1.zip(Array("cf1", "cf2", "cf3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  @Test
+  def testProjectByName(): Unit = {
+    val utils = Seq(streamTestUtil(), batchTestUtil())
+
+    utils.foreach { util =>
+
+      // atomic
+      util.verifySchema(
+        util.addTable[Int](),
+        Seq("f0" -> INT))
+
+      util.verifySchema(
+        util.addTable[Int]('myint),
+        Seq("myint" -> INT))
+
+      // case class
+      util.verifySchema(
+        util.addTable[CClass](),
+        Seq("cf1" -> INT, "cf2" -> STRING, "cf3" -> DOUBLE))
+
+      util.verifySchema(
+        util.addTable[CClass]('cf1, 'cf2),
+        Seq("cf1" -> INT, "cf2" -> STRING))
+
+      util.verifySchema(
+        util.addTable[CClass]('cf1, 'cf3),
+        Seq("cf1" -> INT, "cf3" -> DOUBLE))
+
+      util.verifySchema(
+        util.addTable[CClass]('cf3, 'cf1),
+        Seq("cf3" -> DOUBLE, "cf1" -> INT))
+
+      // row
+      util.verifySchema(
+        util.addTable()(TEST_ROW),
+        Seq("rf1" -> INT, "rf2" -> STRING, "rf3" -> DOUBLE))
+
+      util.verifySchema(
+        util.addTable('rf1, 'rf2)(TEST_ROW),
+        Seq("rf1" -> INT, "rf2" -> STRING))
+
+      util.verifySchema(
+        util.addTable('rf1, 'rf3)(TEST_ROW),
+        Seq("rf1" -> INT, "rf3" -> DOUBLE))
+
+      util.verifySchema(
+        util.addTable('rf3, 'rf1)(TEST_ROW),
+        Seq("rf3" -> DOUBLE, "rf1" -> INT))
+
+      // tuple
+      util.verifySchema(
+        util.addTable[JTuple3[Int, String, Double]](),
+        Seq("f0" -> INT, "f1" -> STRING, "f2" -> DOUBLE))
+
+      util.verifySchema(
+        util.addTable[JTuple3[Int, String, Double]]('f0, 'f1),
+        Seq("f0" -> INT, "f1" -> STRING))
+
+      util.verifySchema(
+        util.addTable[JTuple3[Int, String, Double]]('f0, 'f2),
+        Seq("f0" -> INT, "f2" -> DOUBLE))
+
+      util.verifySchema(
+        util.addTable[JTuple3[Int, String, Double]]('f2, 'f0),
+        Seq("f2" -> DOUBLE, "f0" -> INT))
+
+      // pojo
+      util.verifySchema(
+        util.addTable[PojoClass](),
+        Seq("pf1" -> INT, "pf2" -> STRING, "pf3" -> DOUBLE))
+
+      util.verifySchema(
+        util.addTable[PojoClass]('pf1, 'pf2),
+        Seq("pf1" -> INT, "pf2" -> STRING))
+
+      util.verifySchema(
+        util.addTable[PojoClass]('pf1, 'pf3),
+        Seq("pf1" -> INT, "pf3" -> DOUBLE))
+
+      util.verifySchema(
+        util.addTable[PojoClass]('pf3, 'pf1),
+        Seq("pf3" -> DOUBLE, "pf1" -> INT))
+
+      // generic
+      util.verifySchema(
+        util.addTable[Class[_]]('mygeneric),
+        Seq("mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+      util.verifySchema(
+        util.addTable[Class[_]](),
+        Seq("f0" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+      // any type info
+      util.verifySchema(
+        util.addTable[Unit](),
+        Seq("f0" -> new UnitTypeInfo()))
+
+      util.verifySchema(
+        util.addTable[Unit]('unit),
+        Seq("unit" -> new UnitTypeInfo()))
+    }
   }
 
   @Test
-  def testGetFieldInfoPojo(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(pojoType)
-
-    fieldInfo._1.zip(Array("pf1", "pf2", "pf3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  def testStreamProjectWithAddingTimeAttributesByName(): Unit = {
+    val util = streamTestUtil()
+
+    // atomic
+    util.verifySchema(
+      util.addTable[Int]('proctime.proctime, 'myint),
+      Seq("proctime" -> PROCTIME, "myint" -> INT))
+
+    util.verifySchema(
+      util.addTable[Int]('rowtime.rowtime, 'myint),
+      Seq("rowtime" -> ROWTIME, "myint" -> INT))
+
+    util.verifySchema(
+      util.addTable[Int]('myint, 'proctime.proctime),
+      Seq("myint" -> INT, "proctime" -> PROCTIME))
+
+    util.verifySchema(
+      util.addTable[Int]('myint, 'rowtime.rowtime),
+      Seq("myint" -> INT, "rowtime" -> ROWTIME))
+
+    // case class
+    util.verifySchema(
+      util.addTable[CClass]('proctime.proctime, 'cf1, 'cf3),
+      Seq("proctime" -> PROCTIME, "cf1" -> INT, "cf3" -> DOUBLE))
+
+    util.verifySchema(
+      util.addTable[CClass]('rowtime.rowtime, 'cf3, 'cf1),
+      Seq("rowtime" -> ROWTIME, "cf3" -> DOUBLE, "cf1" -> INT))
+
+    util.verifySchema(
+      util.addTable[CClass]('cf1, 'proctime.proctime, 'cf3),
+      Seq("cf1" -> INT, "proctime" -> PROCTIME, "cf3" -> DOUBLE))
+
+    util.verifySchema(
+      util.addTable[CClass]('cf3, 'rowtime.rowtime, 'cf1),
+      Seq("cf3" -> DOUBLE, "rowtime" -> ROWTIME, "cf1" -> INT))
+
+    util.verifySchema(
+      util.addTable[CClass]('cf1, 'cf3, 'proctime.proctime),
+      Seq("cf1" -> INT, "cf3" -> DOUBLE, "proctime" -> PROCTIME))
+
+    util.verifySchema(
+      util.addTable[CClass]('cf3, 'cf1, 'rowtime.rowtime),
+      Seq("cf3" -> DOUBLE, "cf1" -> INT, "rowtime" -> ROWTIME))
+
+    // row
+    util.verifySchema(
+      util.addTable('proctime.proctime, 'rf1, 'rf3)(TEST_ROW),
+      Seq("proctime" -> PROCTIME, "rf1" -> INT, "rf3" -> DOUBLE))
+
+    util.verifySchema(
+      util.addTable('rowtime.rowtime, 'rf3, 'rf1)(TEST_ROW),
+      Seq("rowtime" -> ROWTIME, "rf3" -> DOUBLE, "rf1" -> INT))
+
+    util.verifySchema(
+      util.addTable('rf3, 'proctime.proctime, 'rf1)(TEST_ROW),
+      Seq("rf3" -> DOUBLE, "proctime" -> PROCTIME, "rf1" -> INT))
+
+    util.verifySchema(
+      util.addTable('rf3, 'rowtime.rowtime, 'rf1)(TEST_ROW),
+      Seq("rf3" -> DOUBLE, "rowtime" -> ROWTIME, "rf1" -> INT))
+
+        util.verifySchema(
+      util.addTable('rf3, 'rf1, 'proctime.proctime)(TEST_ROW),
+      Seq("rf3" -> DOUBLE, "rf1" -> INT, "proctime" -> PROCTIME))
+
+        util.verifySchema(
+      util.addTable('rf3, 'rf1, 'rowtime.rowtime)(TEST_ROW),
+      Seq("rf3" -> DOUBLE, "rf1" -> INT, "rowtime" -> ROWTIME))
+
+    // tuple
+    util.verifySchema(
+      util.addTable[JTuple3[Int, String, Double]]('proctime.proctime, 'f0, 'f2),
+      Seq("proctime" -> PROCTIME, "f0" -> INT, "f2" -> DOUBLE))
+
+    util.verifySchema(
+      util.addTable[JTuple3[Int, String, Double]]('rowtime.rowtime, 'f2, 'f0),
+      Seq("rowtime" -> ROWTIME, "f2" -> DOUBLE, "f0" -> INT))
+
+    util.verifySchema(
+      util.addTable[JTuple3[Int, String, Double]]('f0, 'proctime.proctime, 'f2),
+      Seq("f0" -> INT, "proctime" -> PROCTIME, "f2" -> DOUBLE))
+
+    util.verifySchema(
+      util.addTable[JTuple3[Int, String, Double]]('f2, 'rowtime.rowtime, 'f0),
+      Seq("f2" -> DOUBLE, "rowtime" -> ROWTIME, "f0" -> INT))
+
+    util.verifySchema(
+      util.addTable[JTuple3[Int, String, Double]]('f0, 'f2, 'proctime.proctime),
+      Seq("f0" -> INT, "f2" -> DOUBLE, "proctime" -> PROCTIME))
+
+    util.verifySchema(
+      util.addTable[JTuple3[Int, String, Double]]('f2, 'f0, 'rowtime.rowtime),
+      Seq("f2" -> DOUBLE, "f0" -> INT, "rowtime" -> ROWTIME))
+
+    // pojo
+    util.verifySchema(
+      util.addTable[PojoClass]('proctime.proctime, 'pf1, 'pf3),
+      Seq("proctime" -> PROCTIME, "pf1" -> INT, "pf3" -> DOUBLE))
+
+    util.verifySchema(
+      util.addTable[PojoClass]('rowtime.rowtime, 'pf3, 'pf1),
+      Seq("rowtime" -> ROWTIME, "pf3" -> DOUBLE, "pf1" -> INT))
+
+    util.verifySchema(
+      util.addTable[PojoClass]('pf1, 'proctime.proctime, 'pf3),
+      Seq("pf1" -> INT, "proctime" -> PROCTIME, "pf3" -> DOUBLE))
+
+    util.verifySchema(
+      util.addTable[PojoClass]('pf3, 'rowtime.rowtime, 'pf1),
+      Seq("pf3" -> DOUBLE, "rowtime" -> ROWTIME, "pf1" -> INT))
+
+    util.verifySchema(
+      util.addTable[PojoClass]('pf1, 'pf3, 'proctime.proctime),
+      Seq("pf1" -> INT, "pf3" -> DOUBLE, "proctime" -> PROCTIME))
+
+    util.verifySchema(
+      util.addTable[PojoClass]('pf3, 'pf1, 'rowtime.rowtime),
+      Seq("pf3" -> DOUBLE, "pf1" -> INT, "rowtime" -> ROWTIME))
+
+    // generic
+    util.verifySchema(
+      util.addTable[Class[_]]('proctime.proctime, 'mygeneric),
+      Seq("proctime" -> PROCTIME, "mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+    util.verifySchema(
+      util.addTable[Class[_]]('rowtime.rowtime, 'mygeneric),
+      Seq("rowtime" -> ROWTIME, "mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]])))
+
+    util.verifySchema(
+      util.addTable[Class[_]]('mygeneric, 'proctime.proctime),
+      Seq("mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]]), "proctime" -> PROCTIME))
+
+    util.verifySchema(
+      util.addTable[Class[_]]('mygeneric, 'rowtime.rowtime),
+      Seq("mygeneric" -> new GenericTypeInfo[Class[_]](classOf[Class[_]]), "rowtime" -> ROWTIME))
+
+    // any type info
+    util.verifySchema(
+      util.addTable[Unit]('proctime.proctime, 'unit),
+      Seq("proctime" -> PROCTIME, "unit" -> new UnitTypeInfo()))
+
+    util.verifySchema(
+      util.addTable[Unit]('rowtime.rowtime, 'unit),
+      Seq("rowtime" -> ROWTIME, "unit" -> new UnitTypeInfo()))
+
+    util.verifySchema(
+      util.addTable[Unit]('unit, 'proctime.proctime),
+      Seq("unit" -> new UnitTypeInfo(), "proctime" -> PROCTIME))
+
+    util.verifySchema(
+      util.addTable[Unit]('unit, 'rowtime.rowtime),
+      Seq("unit" -> new UnitTypeInfo(), "rowtime" -> ROWTIME))
   }
 
   @Test
-  def testGetFieldInfoAtomic(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(atomicType)
-
-    fieldInfo._1.zip(Array("f0")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
+  def testStreamProjectWithReplacingTimeAttributesByName(): Unit = {
+    val util = streamTestUtil()
+
+    // atomic
+    util.verifySchema(
+      util.addTable[Long]('new.rowtime),
+      Seq("new" -> ROWTIME))
+
+    util.verifySchema(
+      util.addTable[Int]('new.proctime),
+      Seq("new" -> PROCTIME))
+
+    // case class
+    util.verifySchema(
+      util.addTable[CClassWithTime]('cf1, 'xxx.proctime, 'cf3),
+      Seq("cf1" -> INT, "xxx" -> PROCTIME, "cf3" -> STRING))
+
+    util.verifySchema(
+      util.addTable[CClassWithTime]('cf1, 'cf2.rowtime, 'cf3),
+      Seq("cf1" -> INT, "cf2" -> ROWTIME, "cf3" -> STRING))
+
+    // row
+    util.verifySchema(
+      util.addTable('rf1, 'xxx.proctime, 'rf3)(TEST_ROW_WITH_TIME),
+      Seq("rf1" -> INT, "xxx" -> PROCTIME, "rf3" -> STRING))
+
+    util.verifySchema(
+      util.addTable('rf1, 'rf2.rowtime, 'rf3)(TEST_ROW_WITH_TIME),
+      Seq("rf1" -> INT, "rf2" -> ROWTIME, "rf3" -> STRING))
+
+    // tuple
+    util.verifySchema(
+      util.addTable[JTuple3[Int, Long, String]]('f0, 'xxx.proctime, 'f2),
+      Seq("f0" -> INT, "xxx" -> PROCTIME, "f2" -> STRING))
+
+    util.verifySchema(
+      util.addTable[JTuple3[Int, Long, String]]('f0, 'f1.rowtime, 'f2),
+      Seq("f0" -> INT, "f1" -> ROWTIME, "f2" -> STRING))
   }
 
   @Test
-  def testGetFieldInfoTupleNames(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  def testAliasByName(): Unit = {
+    val utils = Seq(streamTestUtil(), batchTestUtil())
+
+    utils.foreach { util =>
+
+      // case class
+      util.verifySchema(
+        util.addTable[CClass]('cf1, 'cf3 as 'new, 'cf2),
+        Seq("cf1" -> INT, "new" -> DOUBLE, "cf2" -> STRING))
+
+      // row
+      util.verifySchema(
+        util.addTable('rf1, 'rf3 as 'new, 'rf2)(TEST_ROW),
+        Seq("rf1" -> INT, "new" -> DOUBLE, "rf2" -> STRING))
+
+      // tuple
+      util.verifySchema(
+        util.addTable[JTuple3[Int, String, Double]]('f0, 'f2 as 'new, 'f1),
+        Seq("f0" -> INT, "new" -> DOUBLE, "f1" -> STRING))
+    }
   }
 
   @Test
-  def testGetFieldInfoCClassNames(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  def testStreamAliasWithAddingTimeAttributesByName(): Unit = {
+    val util = streamTestUtil()
+
+    // atomic
+    util.verifySchema(
+      util.addTable[Int]('new.proctime),
+      Seq("new" -> PROCTIME))
+
+    // case class
+    util.verifySchema(
+      util.addTable[CClassWithTime]('cf1, 'new.proctime, 'cf2),
+      Seq("cf1" -> INT, "new" -> PROCTIME, "cf2" -> LONG))
+
+    util.verifySchema(
+      util.addTable[CClassWithTime]('cf1, 'new.rowtime, 'cf2),
+      Seq("cf1" -> INT, "new" -> ROWTIME, "cf2" -> LONG))
+
+    // row
+    util.verifySchema(
+      util.addTable('rf1, 'new.proctime, 'rf2)(TEST_ROW_WITH_TIME),
+      Seq("rf1" -> INT, "new" -> PROCTIME, "rf2" -> LONG))
+
+    util.verifySchema(
+      util.addTable('rf1, 'new.rowtime, 'rf2)(TEST_ROW_WITH_TIME),
+      Seq("rf1" -> INT, "new" -> ROWTIME, "rf2" -> LONG))
+
+    // tuple
+    util.verifySchema(
+      util.addTable[JTuple3[Int, Long, String]]('f0, 'new.proctime, 'f1),
+      Seq("f0" -> INT, "new" -> PROCTIME, "f1" -> LONG))
+
+    util.verifySchema(
+      util.addTable[JTuple3[Int, Long, String]]('f0, 'new.rowtime, 'f1),
+      Seq("f0" -> INT, "new" -> ROWTIME, "f1" -> LONG))
   }
 
   @Test
-  def testGetFieldInfoPojoNames2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        UnresolvedFieldReference("pf3"),
-        UnresolvedFieldReference("pf1"),
-        UnresolvedFieldReference("pf2")
-      ))
-
-    fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+  def testStreamAliasWithReplacingTimeAttributesByName(): Unit = {
+    val util = streamTestUtil()
+
+    // case class
+    util.verifySchema(
+      util.addTable[CClassWithTime]('cf1, ('cf2 as 'new).rowtime, 'cf3),
+      Seq("cf1" -> INT, "new" -> ROWTIME, "cf3" -> STRING))
+
+    // row
+    util.verifySchema(
+      util.addTable('rf1, ('rf2 as 'new).rowtime, 'rf3)(TEST_ROW_WITH_TIME),
+      Seq("rf1" -> INT, "new" -> ROWTIME, "rf3" -> STRING))
+
+    // tuple
+    util.verifySchema(
+      util.addTable[JTuple3[Int, Long, String]]('f0, ('f1 as 'new).rowtime, 'f2),
+      Seq("f0" -> INT, "new" -> ROWTIME, "f2" -> STRING))
   }
+}
 
-  @Test
-  def testGetFieldInfoAtomicName1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      atomicType,
-      Array(UnresolvedFieldReference("name")))
+object TableEnvironmentTest {
 
-    fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
-  }
+  case class CClass(cf1: Int, cf2: String, cf3: Double)
 
-  @Test
-  def testGetFieldInfoTupleAlias1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        Alias(UnresolvedFieldReference("f0"), "name1"),
-        Alias(UnresolvedFieldReference("f1"), "name2"),
-        Alias(UnresolvedFieldReference("f2"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
+  case class CClassWithTime(cf1: Int, cf2: Long, cf3: String)
 
-  @Test
-  def testGetFieldInfoTupleAlias2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        Alias(UnresolvedFieldReference("f2"), "name1"),
-        Alias(UnresolvedFieldReference("f0"), "name2"),
-        Alias(UnresolvedFieldReference("f1"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+  class PojoClass(var pf2: String, var pf1: Int, var pf3: Double) {
+    def this() = this("", 0, 0.0)
   }
 
-  @Test
-  def testGetFieldInfoCClassAlias1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        Alias(UnresolvedFieldReference("cf1"), "name1"),
-        Alias(UnresolvedFieldReference("cf2"), "name2"),
-        Alias(UnresolvedFieldReference("cf3"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  class PojoClassWithTime(var pf2: String, var pf1: Int, var pf3: Long) {
+    def this() = this("", 0, 0L)
   }
 
-  @Test
-  def testGetFieldInfoCClassAlias2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        Alias(UnresolvedFieldReference("cf3"), "name1"),
-        Alias(UnresolvedFieldReference("cf1"), "name2"),
-        Alias(UnresolvedFieldReference("cf2"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
-  }
+  val TEST_ROW: TypeInformation[Row] = ROW(
+    Array("rf1", "rf2", "rf3"),
+    Array[TypeInformation[_]](INT, STRING, DOUBLE))
 
-  @Test
-  def testGetFieldInfoPojoAlias1(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        Alias(UnresolvedFieldReference("pf1"), "name1"),
-        Alias(UnresolvedFieldReference("pf2"), "name2"),
-        Alias(UnresolvedFieldReference("pf3"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
-  }
+  val TEST_ROW_WITH_TIME: TypeInformation[Row] = ROW(
+    Array("rf1", "rf2", "rf3"),
+    Array[TypeInformation[_]](INT, LONG, STRING))
 
-  @Test
-  def testGetFieldInfoPojoAlias2(): Unit = {
-    val fieldInfo = tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        Alias(UnresolvedFieldReference("pf3"), "name1"),
-        Alias(UnresolvedFieldReference("pf1"), "name2"),
-        Alias(UnresolvedFieldReference("pf2"), "name3")
-      ))
-
-    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
-    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
-  }
 }
 
-case class CClass(cf1: Int, cf2: String, cf3: Double)
 
-class PojoClass(var pf1: Int, var pf2: String, var pf3: Double) {
-  def this() = this(0, "", 0.0)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
index 1fd58cc..bfa7bfa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentValidationTest.scala
@@ -30,77 +30,102 @@ import org.junit.Test
 
 class StreamTableEnvironmentValidationTest extends TableTestBase {
 
+  // ----------------------------------------------------------------------------------------------
+  // schema definition by position
+  // ----------------------------------------------------------------------------------------------
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtimeAliasByPosition(): Unit = {
+    val util = streamTestUtil()
+    // don't allow aliasing by position
+    util.addTable[(Long, Int, String, Int, Long)](('a as 'b).rowtime, 'b, 'c, 'd, 'e)
+  }
+
   @Test(expected = classOf[TableException])
-  def testInvalidTimeAttributes(): Unit = {
+  def testInvalidRowtimeAttributesByPosition(): Unit = {
     val util = streamTestUtil()
     // table definition makes no sense
     util.addTable[(Long, Int, String, Int, Long)]('a.rowtime.rowtime, 'b, 'c, 'd, 'e)
   }
 
   @Test(expected = classOf[TableException])
-  def testInvalidProctimeAttribute(): Unit = {
+  def testInvalidProctimeAttributesByPosition(): Unit = {
+    val util = streamTestUtil()
+    // table definition makes no sense
+    util.addTable[(Long, Int, String, Int, Long)]('a.proctime.proctime, 'b, 'c, 'd, 'e)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidTimeAttributesByPosition(): Unit = {
+    val util = streamTestUtil()
+    // table definition makes no sense
+    util.addTable[(Long, Int, String, Int, Long)]('a.rowtime.rowtime, 'b, 'c, 'd, 'e)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidProctimeAttributeByPosition(): Unit = {
     val util = streamTestUtil()
     // cannot replace an attribute with proctime
     util.addTable[(Long, Int, String, Int, Long)]('a, 'b.proctime, 'c, 'd, 'e)
   }
 
   @Test(expected = classOf[TableException])
-  def testRowtimeAttributeReplaceFieldOfInvalidType(): Unit = {
+  def testRowtimeAttributeReplaceFieldOfInvalidTypeByPosition(): Unit = {
     val util = streamTestUtil()
     // cannot replace a non-time attribute with rowtime
     util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c.rowtime, 'd, 'e)
   }
 
   @Test(expected = classOf[TableException])
-  def testRowtimeAndInvalidProctimeAttribute(): Unit = {
+  def testRowtimeAndInvalidProctimeAttributeByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'pt.proctime)
   }
 
   @Test(expected = classOf[TableException])
-  def testOnlyOneRowtimeAttribute1(): Unit = {
+  def testOnlyOneRowtimeAttribute1ByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Long, Int, String, Int, Long)]('a.rowtime, 'b, 'c, 'd, 'e, 'rt.rowtime)
   }
 
   @Test(expected = classOf[TableException])
-  def testOnlyOneProctimeAttribute1(): Unit = {
+  def testOnlyOneProctimeAttribute1ByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt1.proctime, 'pt2.proctime)
   }
 
   @Test(expected = classOf[TableException])
-  def testRowtimeAttributeUsedName(): Unit = {
+  def testRowtimeAttributeUsedNameByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'a.rowtime)
   }
 
   @Test(expected = classOf[TableException])
-  def testProctimeAttributeUsedName(): Unit = {
+  def testProctimeAttributeUsedNameByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'b.proctime)
   }
 
   @Test(expected = classOf[TableException])
-  def testAsWithToManyFields(): Unit = {
+  def testAsWithToManyFieldsByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Int, Long, String)]('a, 'b, 'c, 'd)
   }
 
   @Test(expected = classOf[TableException])
-  def testAsWithAmbiguousFields(): Unit = {
+  def testAsWithAmbiguousFieldsByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Int, Long, String)]('a, 'b, 'b)
   }
 
   @Test(expected = classOf[TableException])
-  def testOnlyFieldRefInAs(): Unit = {
+  def testOnlyFieldRefInAsByPosition(): Unit = {
     val util = streamTestUtil()
     util.addTable[(Int, Long, String)]('a, 'b as 'c, 'd)
   }
 
   @Test(expected = classOf[TableException])
-  def testInvalidTimeCharacteristic(): Unit = {
+  def testInvalidTimeCharacteristicByPosition(): Unit = {
     val data = List((1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"))
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -109,4 +134,57 @@ class StreamTableEnvironmentValidationTest extends TableTestBase {
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
     stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
   }
+
+  // ----------------------------------------------------------------------------------------------
+  // schema definition by name
+  // ----------------------------------------------------------------------------------------------
+
+  @Test(expected = classOf[TableException])
+  def testInvalidAliasByName(): Unit = {
+    val util = streamTestUtil()
+    // we reference by name, but the field does not exist
+    util.addTable[(Long, Int, String, Int, Long)]('x as 'r)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidFieldByName(): Unit = {
+    val util = streamTestUtil()
+    // we reference by name, but the field does not exist
+    util.addTable[(Long, Int, String, Int, Long)]('x as 'r)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidField2ByName(): Unit = {
+    val util = streamTestUtil()
+    // we mix reference by position and by name
+    util.addTable[(Long, Int, String, Int, Long)]('x, '_1)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidAliasWithProctimeAttribute(): Unit = {
+    val util = streamTestUtil()
+    // alias in proctime not allowed
+    util.addTable[(Int, Long, String)]('_1, ('_2 as 'new).proctime, '_3)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidReplacingProctimeAttribute(): Unit = {
+    val util = streamTestUtil()
+    // proctime must not replace an existing field
+    util.addTable[(Int, Long, String)]('_1, '_2.proctime, '_3)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidAliasWithRowtimeAttribute(): Unit = {
+    val util = streamTestUtil()
+    // aliased field does not exist
+    util.addTable[(Int, Long, String)]('_1, ('newnew as 'new).rowtime, '_3)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidAliasWithRowtimeAttribute2(): Unit = {
+    val util = streamTestUtil()
+    // aliased field has wrong type
+    util.addTable[(Int, Long, String)]('_1, ('_3 as 'new).rowtime, '_2)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
index 1e7ad61..156dd8a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableEnvironmentValidationTest.scala
@@ -23,15 +23,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironmentTest.{CClass, PojoClass}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{CClass, PojoClass, TableEnvironment, TableException}
-import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
+import org.apache.flink.table.api.{TableEnvironment, TableException}
 import org.apache.flink.table.runtime.types.CRowTypeInfo
+import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert.assertTrue
 import org.junit._
 
-class TableEnvironmentValidationTest {
+class TableEnvironmentValidationTest extends TableTestBase {
 
   private val env = ExecutionEnvironment.getExecutionEnvironment
   private val tEnv = TableEnvironment.getTableEnvironment(env)
@@ -53,75 +54,60 @@ class TableEnvironmentValidationTest {
 
   val genericRowType = new GenericTypeInfo[Row](classOf[Row])
 
+  @Test(expected = classOf[TableException])
+  def testInvalidAliasInRefByPosMode(): Unit = {
+    val util = batchTestUtil()
+    // all references must happen position-based
+    util.addTable('a, 'b, 'f2 as 'c)(tupleType)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidAliasOnAtomicType(): Unit = {
+    val util = batchTestUtil()
+    // alias not allowed
+    util.addTable('g as 'c)(atomicType)
+  }
 
   @Test(expected = classOf[TableException])
   def testGetFieldInfoPojoNames1(): Unit = {
-    tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
+    val util = batchTestUtil()
+    // duplicate name
+    util.addTable('name1, 'name1, 'name3)(pojoType)
   }
 
   @Test(expected = classOf[TableException])
   def testGetFieldInfoAtomicName2(): Unit = {
-    tEnv.getFieldInfo(
-      atomicType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2")
-      ))
+    val util = batchTestUtil()
+    // must be only one name
+    util.addTable('name1, 'name2)(atomicType)
   }
 
   @Test(expected = classOf[TableException])
   def testGetFieldInfoTupleAlias3(): Unit = {
-    tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        Alias(UnresolvedFieldReference("xxx"), "name1"),
-        Alias(UnresolvedFieldReference("yyy"), "name2"),
-        Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
+    val util = batchTestUtil()
+    // fields do not exist
+    util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as 'name3)(tupleType)
   }
 
   @Test(expected = classOf[TableException])
   def testGetFieldInfoCClassAlias3(): Unit = {
-    tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        Alias(UnresolvedFieldReference("xxx"), "name1"),
-        Alias(UnresolvedFieldReference("yyy"), "name2"),
-        Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
+    val util = batchTestUtil()
+    // fields do not exist
+    util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as 'name3)(caseClassType)
   }
 
   @Test(expected = classOf[TableException])
   def testGetFieldInfoPojoAlias3(): Unit = {
-    tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        Alias(UnresolvedFieldReference("xxx"), "name1"),
-        Alias(UnresolvedFieldReference("yyy"), "name2"),
-        Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoAtomicAlias(): Unit = {
-    tEnv.getFieldInfo(
-      atomicType,
-      Array(
-        Alias(UnresolvedFieldReference("name1"), "name2")
-      ))
+    val util = batchTestUtil()
+    // fields do not exist
+    util.addTable('xxx as 'name1, 'yyy as 'name2, 'zzz as 'name3)(pojoType)
   }
 
   @Test(expected = classOf[TableException])
   def testGetFieldInfoGenericRowAlias(): Unit = {
-    tEnv.getFieldInfo(
-      genericRowType,
-      Array(UnresolvedFieldReference("first")))
+    val util = batchTestUtil()
+    // unsupported generic row type
+    util.addTable('first)(genericRowType)
   }
 
   @Test(expected = classOf[TableException])

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
index 725c580..5295e7c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
@@ -63,14 +63,14 @@ class TableEnvironmentITCase(
   }
 
   @Test
-  def testRegisterWithFields(): Unit = {
+  def testRegisterWithFieldsByPosition(): Unit = {
 
     val tableName = "MyTable"
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
+    tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c) // new alias
     val t = tEnv.scan(tableName).select('a, 'b)
 
     val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
@@ -81,6 +81,24 @@ class TableEnvironmentITCase(
   }
 
   @Test
+  def testRegisterWithFieldsByName(): Unit = {
+
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet(tableName, ds, '_3, '_1, '_2) // new order
+    val t = tEnv.scan(tableName).select('_1, '_2)
+
+    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
+      "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testTableRegister(): Unit = {
 
     val tableName = "MyTable"

http://git-wip-us.apache.org/repos/asf/flink/blob/fb29898c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 804fad8..30b67e7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{Table, TableEnvironment}
+import org.apache.flink.table.api.{Table, TableEnvironment, TableSchema}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 import org.junit.Assert.assertEquals
@@ -82,6 +82,12 @@ abstract class TableTestUtil {
 
   def verifyTable(resultTable: Table, expected: String): Unit
 
+  def verifySchema(resultTable: Table, fields: Seq[(String, TypeInformation[_])]): Unit = {
+    val actual = resultTable.getSchema
+    val expected = new TableSchema(fields.map(_._1).toArray, fields.map(_._2).toArray)
+    assertEquals(expected, actual)
+  }
+
   // the print methods are for debugging purposes only
   def printTable(resultTable: Table): Unit