You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/11/22 08:19:21 UTC

flink git commit: [FLINK-4294] [table] Allow access of composite type fields

Repository: flink
Updated Branches:
  refs/heads/master fdb134cab -> e45096867


[FLINK-4294] [table] Allow access of composite type fields

This closes #2319.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4509686
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4509686
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4509686

Branch: refs/heads/master
Commit: e45096867a9542a756443310a535a971a6a29695
Parents: fdb134c
Author: twalthr <tw...@apache.org>
Authored: Mon Aug 1 14:15:49 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Nov 22 09:15:09 2016 +0100

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  84 +++++++-
 .../flink/api/scala/table/expressionDsl.scala   |  23 +++
 .../flink/api/table/FlinkPlannerImpl.scala      |   4 +-
 .../flink/api/table/FlinkTypeFactory.scala      |  29 ++-
 .../flink/api/table/codegen/CodeGenerator.scala |  39 +++-
 .../table/expressions/ExpressionParser.scala    |  23 ++-
 .../flink/api/table/expressions/composite.scala | 106 ++++++++++
 .../api/table/expressions/fieldExpression.scala |   6 +-
 .../api/table/plan/ProjectionTranslator.scala   |  29 ++-
 .../api/table/plan/logical/LogicalNode.scala    |   2 +-
 .../api/table/plan/logical/operators.scala      |   1 +
 .../flink/api/table/plan/nodes/FlinkRel.scala   |  25 ++-
 .../plan/schema/CompositeRelDataType.scala      |  83 ++++++++
 .../org/apache/flink/api/table/table.scala      |  12 +-
 .../flink/api/java/batch/sql/SqlITCase.java     |   2 +-
 .../scala/batch/sql/TableWithSQLITCase.scala    |  18 ++
 .../api/table/CompositeFlatteningTest.scala     | 146 ++++++++++++++
 .../table/expressions/CompositeAccessTest.scala | 192 +++++++++++++++++++
 .../table/expressions/SqlExpressionTest.scala   |   3 +-
 .../expressions/utils/ExpressionTestBase.scala  |   1 -
 20 files changed, 795 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index d017337..bb0e500 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1470,7 +1470,7 @@ The Table API is built on top of Flink's DataSet and DataStream API. Internally,
 | `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH`    | `java.lang.Integer`    |
 | `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long`       |
 
-Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and arrays can be fields of a row but can not be accessed yet. They are treated like a black box within Table API and SQL.
+Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and arrays can be fields of a row. Generic types and arrays are treated as a black box within Table API and SQL yet. Composite types, however, are fully supported types where fields of a composite type can be accessed using the `.get()` operator in Table API and dot operator (e.g. `MyTable.pojoColumn.myField`) in SQL. Composite types can also be flattened using `.flatten()` in Table API or `MyTable.pojoColumn.*` in SQL.
 
 {% top %}
 
@@ -2004,6 +2004,29 @@ NUMERIC.rows
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight java %}
+ANY.flatten()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes into a flat representation where every subtype is a separate field. In most cases the fields of the flat representation are named similarly to the original fields but with a dollar separator (e.g. <code>mypojo$mytuple$f0</code>).</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+COMPOSITE.get(STRING)
+COMPOSITE.get(INT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index or name and returns it's value. E.g. <code>pojo.get('myField')</code> or <code>tuple.get(0)</code>.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 
@@ -2531,6 +2554,29 @@ NUMERIC.rows
       </td>
     </tr>
 
+    <tr>
+      <td>
+        {% highlight scala %}
+ANY.flatten()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes into a flat representation where every subtype is a separate field. In most cases the fields of the flat representation are named similarly to the original fields but with a dollar separator (e.g. <code>mypojo$mytuple$f0</code>).</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+COMPOSITE.get(STRING)
+COMPOSITE.get(INT)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index or name and returns it's value. E.g. <code>'pojo.get("myField")</code> or <code>'tuple.get(0)</code>.</p>
+      </td>
+    </tr>
+
   </tbody>
 </table>
 </div>
@@ -3298,6 +3344,8 @@ CAST(value AS type)
   </tbody>
 </table>
 
+
+<!-- Disabled temporarily in favor of composite type support
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -3330,6 +3378,7 @@ ROW (value [, value]* )
     </tr>
   </tbody>
 </table>
+-->
 
 <table class="table table-bordered">
   <thead>
@@ -3551,6 +3600,39 @@ MIN(value)
   </tbody>
 </table>
 
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Value access functions</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td>
+        {% highlight text %}
+tableName.compositeType.field
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and returns it's value.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
+tableName.compositeType.*
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes into a flat representation where every subtype is a separate field.</p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
 </div>
 </div>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 836db3e..fee43d8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -431,6 +431,29 @@ trait ImplicitExpressionOperations {
     */
   def rows = toRowInterval(expr)
 
+  /**
+    * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and
+    * returns it's value.
+    *
+    * @param name name of the field (similar to Flink's field expressions)
+    * @return value of the field
+    */
+  def get(name: String) = GetCompositeField(expr, name)
+
+  /**
+    * Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by index and
+    * returns it's value.
+    *
+    * @param index position of the field
+    * @return value of the field
+    */
+  def get(index: Int) = GetCompositeField(expr, index)
+
+  /**
+    * Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes
+    * into a flat representation where every subtype is a separate field.
+    */
+  def flatten() = Flattening(expr)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
index 97e5cf2..131cdc6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
@@ -103,7 +103,9 @@ class FlinkPlannerImpl(
       val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
         new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config)
       root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
-      root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
+      // we disable automatic flattening in order to let composite types pass without modification
+      // we might enable it again once Calcite has better support for structured types
+      // root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
       root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
       root
     } catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
index 1f607e4..ee71ce9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
@@ -22,14 +22,15 @@ import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
 import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName, SqlTypeUtil}
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName
-import org.apache.flink.api.table.plan.schema.GenericRelDataType
+import org.apache.flink.api.table.plan.schema.{CompositeRelDataType, GenericRelDataType}
 import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple
 
@@ -41,6 +42,9 @@ import scala.collection.mutable
   */
 class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) {
 
+  // NOTE: for future data types it might be necessary to
+  // override more methods of RelDataTypeFactoryImpl
+
   private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
 
   def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
@@ -79,14 +83,27 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
   }
 
   private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
-    // TODO add specific RelDataTypes
-    // for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType
+    case ct: CompositeType[_] =>
+      new CompositeRelDataType(ct, this)
+
+    // TODO add specific RelDataTypes for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo
     case ti: TypeInformation[_] =>
       new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
 
     case ti@_ =>
       throw TableException(s"Unsupported type information: $ti")
   }
+
+  override def createTypeWithNullability(
+      relDataType: RelDataType,
+      nullable: Boolean)
+    : RelDataType = relDataType match {
+    case composite: CompositeRelDataType =>
+      // at the moment we do not care about nullability
+      composite
+    case _ =>
+      super.createTypeWithNullability(relDataType, nullable)
+  }
 }
 
 object FlinkTypeFactory {
@@ -147,6 +164,10 @@ object FlinkTypeFactory {
       val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
       genericRelDataType.typeInfo
 
+    case ROW if relDataType.isInstanceOf[CompositeRelDataType] =>
+      val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType]
+      compositeRelDataType.compositeType
+
     case _@t =>
       throw TableException(s"Type is not supported: $t")
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index bbcd70f..d40e0e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -608,9 +608,40 @@ class CodeGenerator(
     generateInputAccess(input._1, input._2, index)
   }
 
-  override def visitFieldAccess(rexFieldAccess: RexFieldAccess): GeneratedExpression =
-    throw new CodeGenException("Accesses to fields are not supported yet.")
+  override def visitFieldAccess(rexFieldAccess: RexFieldAccess): GeneratedExpression = {
+    val refExpr = rexFieldAccess.getReferenceExpr.accept(this)
+    val index = rexFieldAccess.getField.getIndex
+    val fieldAccessExpr = generateFieldAccess(refExpr.resultType, refExpr.resultTerm, index)
 
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(fieldAccessExpr.resultType)
+    val defaultValue = primitiveDefaultValue(fieldAccessExpr.resultType)
+    val resultCode = if (nullCheck) {
+      s"""
+        |${refExpr.code}
+        |$resultTypeTerm $resultTerm;
+        |boolean $nullTerm;
+        |if (${refExpr.nullTerm}) {
+        |  $resultTerm = $defaultValue;
+        |  $nullTerm = true;
+        |}
+        |else {
+        |  ${fieldAccessExpr.code}
+        |  $resultTerm = ${fieldAccessExpr.resultTerm};
+        |  $nullTerm = ${fieldAccessExpr.nullTerm};
+        |}
+        |""".stripMargin
+    } else {
+      s"""
+        |${refExpr.code}
+        |${fieldAccessExpr.code}
+        |$resultTypeTerm $resultTerm = ${fieldAccessExpr.resultTerm};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, resultCode, fieldAccessExpr.resultType)
+  }
 
   override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
     val resultType = FlinkTypeFactory.toTypeInfo(literal.getType)
@@ -1014,13 +1045,13 @@ class CodeGenerator(
   }
 
   private def generateFieldAccess(
-      inputType: TypeInformation[Any],
+      inputType: TypeInformation[_],
       inputTerm: String,
       index: Int)
     : GeneratedExpression = {
     inputType match {
       case ct: CompositeType[_] =>
-        val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]]) {
+        val fieldIndex = if (ct.isInstanceOf[PojoTypeInfo[_]] && inputPojoFieldMapping.nonEmpty) {
           inputPojoFieldMapping.get(index)
         }
         else {

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index a438c1c..6b6c129 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -84,6 +84,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val MILLI: Keyword = Keyword("milli")
   lazy val ROWS: Keyword = Keyword("rows")
   lazy val STAR: Keyword = Keyword("*")
+  lazy val GET: Keyword = Keyword("get")
+  lazy val FLATTEN: Keyword = Keyword("flatten")
 
   def functionIdent: ExpressionParser.Parser[String] =
     not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
@@ -277,11 +279,21 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val suffixRowInterval : PackratParser[Expression] =
     composite <~ "." ~ ROWS ^^ { e => ExpressionUtils.toRowInterval(e) }
 
+  lazy val suffixGet: PackratParser[Expression] =
+    composite ~ "." ~ GET ~ "(" ~ literalExpr ~ ")" ^^ {
+      case e ~ _ ~ _ ~ _ ~ index ~ _ =>
+        GetCompositeField(e, index.asInstanceOf[Literal].value)
+  }
+
+  lazy val suffixFlattening: PackratParser[Expression] =
+    composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e => Flattening(e) }
+
   lazy val suffixed: PackratParser[Expression] =
     suffixTimeInterval | suffixRowInterval | suffixSum | suffixMin | suffixMax | suffixStart |
       suffixEnd | suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim |
       suffixTrimWithoutArgs | suffixIf | suffixAsc | suffixDesc | suffixToDate |
       suffixToTimestamp | suffixToTime | suffixExtract | suffixFloor | suffixCeil |
+      suffixGet | suffixFlattening |
       suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end
 
   // prefix operators
@@ -350,10 +362,19 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
     case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
   }
 
+  lazy val prefixGet: PackratParser[Expression] =
+    GET ~ "(" ~ composite ~ ","  ~ literalExpr ~ ")" ^^ {
+      case _ ~ _ ~ e ~ _ ~ index ~ _ =>
+        GetCompositeField(e, index.asInstanceOf[Literal].value)
+  }
+
+  lazy val prefixFlattening: PackratParser[Expression] =
+    FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
+
   lazy val prefixed: PackratParser[Expression] =
     prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | prefixStart | prefixEnd |
       prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
-      prefixFloor | prefixCeil |
+      prefixFloor | prefixCeil | prefixGet | prefixFlattening |
       prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end
 
   // suffix/prefix composite

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
new file mode 100644
index 0000000..ee1eb46
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.UnresolvedException
+import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+/**
+  * Flattening of composite types. All flattenings are resolved into
+  * `GetCompositeField` expressions.
+  */
+case class Flattening(child: Expression) extends UnaryExpression {
+
+  override def toString = s"$child.flatten()"
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
+
+  override private[flink] def validateInput(): ValidationResult =
+    ValidationFailure(s"Unresolved flattening of $child")
+}
+
+case class GetCompositeField(child: Expression, key: Any) extends UnaryExpression {
+
+  private var fieldIndex: Option[Int] = None
+
+  override def toString = s"$child.get($key)"
+
+  override private[flink] def validateInput(): ValidationResult = {
+    // check for composite type
+    if (!child.resultType.isInstanceOf[CompositeType[_]]) {
+      return ValidationFailure(s"Cannot access field of non-composite type '${child.resultType}'.")
+    }
+    val compositeType = child.resultType.asInstanceOf[CompositeType[_]]
+
+    // check key
+    key match {
+      case name: String =>
+        val index = compositeType.getFieldIndex(name)
+        if (index < 0) {
+          ValidationFailure(s"Field name '$name' could not be found.")
+        } else {
+          fieldIndex = Some(index)
+          ValidationSuccess
+        }
+      case index: Int =>
+        if (index >= compositeType.getArity) {
+          ValidationFailure(s"Field index '$index' exceeds arity.")
+        } else {
+          fieldIndex = Some(index)
+          ValidationSuccess
+        }
+      case _ =>
+        ValidationFailure(s"Invalid key '$key'.")
+    }
+  }
+
+  override private[flink] def resultType: TypeInformation[_] =
+    child.resultType.asInstanceOf[CompositeType[_]].getTypeAt(fieldIndex.get)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeFieldAccess(child.toRexNode, fieldIndex.get)
+  }
+
+  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+    val child: Expression = anyRefs.head.asInstanceOf[Expression]
+    copy(child, key).asInstanceOf[this.type]
+  }
+
+  /**
+    * Gives a meaningful alias if possible (e.g. a$mypojo$field).
+    */
+  private[flink] def aliasName(): Option[String] = child match {
+    case gcf: GetCompositeField =>
+      val alias = gcf.aliasName()
+      if (alias.isDefined) {
+        Some(s"${alias.get}$$$key")
+      } else {
+        None
+      }
+    case c: ResolvedFieldReference => Some(s"${c.name}$$$key")
+    case _ => None
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
index 91efd08..c7817bf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
@@ -103,13 +103,13 @@ case class Alias(child: Expression, name: String)
 case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression {
 
   override private[flink] def name: String =
-    throw new UnresolvedException("Invalid call to name on UnresolvedAlias")
+    throw UnresolvedException("Invalid call to name on UnresolvedAlias")
 
   override private[flink] def toAttribute: Attribute =
-    throw new UnresolvedException("Invalid call to toAttribute on UnresolvedAlias")
+    throw UnresolvedException("Invalid call to toAttribute on UnresolvedAlias")
 
   override private[flink] def resultType: TypeInformation[_] =
-    throw new UnresolvedException("Invalid call to resultType on UnresolvedAlias")
+    throw UnresolvedException("Invalid call to resultType on UnresolvedAlias")
 
   override private[flink] lazy val valid = false
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
index d09b03e..cd22f6a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
@@ -18,9 +18,10 @@
 
 package org.apache.flink.api.table.plan
 
+import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.table.TableEnvironment
 import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.plan.logical.LogicalNode
+import org.apache.flink.api.table.plan.logical.{LogicalNode, Project}
 
 import scala.collection.mutable.ListBuffer
 
@@ -159,11 +160,35 @@ object ProjectionTranslator {
   /**
     * Expands an UnresolvedFieldReference("*") to parent's full project list.
     */
-  def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[Expression] = {
+  def expandProjectList(
+      exprs: Seq[Expression],
+      parent: LogicalNode,
+      tableEnv: TableEnvironment)
+    : Seq[Expression] = {
+
     val projectList = new ListBuffer[Expression]
+
     exprs.foreach {
       case n: UnresolvedFieldReference if n.name == "*" =>
         projectList ++= parent.output.map(a => UnresolvedFieldReference(a.name))
+
+      case Flattening(unresolved) =>
+        // simulate a simple project to resolve fields using current parent
+        val project = Project(Seq(UnresolvedAlias(unresolved)), parent).validate(tableEnv)
+        val resolvedExpr = project
+          .output
+          .headOption
+          .getOrElse(throw new RuntimeException("Could not find resolved composite."))
+        resolvedExpr.validateInput()
+        val newProjects = resolvedExpr.resultType match {
+          case ct: CompositeType[_] =>
+            (0 until ct.getArity).map { idx =>
+              projectList += GetCompositeField(unresolved, ct.getFieldNames()(idx))
+            }
+          case _ =>
+            projectList += unresolved
+        }
+
       case e: Expression => projectList += e
     }
     projectList

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
index 55fba07..21290d4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.plan.logical
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment, ValidationException}
+import org.apache.flink.api.table.{TableEnvironment, ValidationException}
 import org.apache.flink.api.table.expressions._
 import org.apache.flink.api.table.trees.TreeNode
 import org.apache.flink.api.table.typeutils.TypeCoercion

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 1d7ed5f..ecf1996 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -45,6 +45,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
             case ne: NamedExpression => ne
             case expr if !expr.valid => u
             case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")
+            case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
             case other => Alias(other, s"_c$i")
           }
           case _ =>

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
index a4c7589..7932e11 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
@@ -35,19 +35,32 @@ trait FlinkRel {
     localExprsTable: Option[List[RexNode]]): String = {
 
     expr match {
-      case i: RexInputRef => inFields.get(i.getIndex)
-      case l: RexLiteral => l.toString
+      case i: RexInputRef =>
+        inFields.get(i.getIndex)
+
+      case l: RexLiteral =>
+        l.toString
+
       case l: RexLocalRef if localExprsTable.isEmpty =>
-        throw new IllegalArgumentException("Encountered RexLocalRef without local expression table")
+        throw new IllegalArgumentException("Encountered RexLocalRef without " +
+          "local expression table")
+
       case l: RexLocalRef =>
         val lExpr = localExprsTable.get(l.getIndex)
         getExpressionString(lExpr, inFields, localExprsTable)
-      case c: RexCall => {
+
+      case c: RexCall =>
         val op = c.getOperator.toString
         val ops = c.getOperands.map(getExpressionString(_, inFields, localExprsTable))
         s"$op(${ops.mkString(", ")})"
-      }
-      case _ => throw new IllegalArgumentException("Unknown expression type: " + expr)
+
+      case fa: RexFieldAccess =>
+        val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, localExprsTable)
+        val field = fa.getField.getName
+        s"$referenceExpr.$field"
+
+      case _ =>
+        throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
new file mode 100644
index 0000000..b9ceff0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/CompositeRelDataType.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import java.util
+
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataTypeFieldImpl, RelRecordType}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.FlinkTypeFactory
+import org.apache.flink.api.table.plan.schema.CompositeRelDataType.createFieldList
+
+import scala.collection.JavaConverters._
+
+/**
+  * Composite type for encapsulating Flink's [[CompositeType]].
+  *
+  * @param compositeType CompositeType to encapsulate
+  * @param typeFactory Flink's type factory
+  */
+class CompositeRelDataType(
+    val compositeType: CompositeType[_],
+    typeFactory: FlinkTypeFactory)
+  extends RelRecordType(createFieldList(compositeType, typeFactory)) {
+
+  override def toString = s"COMPOSITE($compositeType)"
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[CompositeRelDataType]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: CompositeRelDataType =>
+      super.equals(that) &&
+        (that canEqual this) &&
+        compositeType == that.compositeType
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    compositeType.hashCode()
+  }
+
+}
+
+object CompositeRelDataType {
+
+  /**
+    * Converts the fields of a composite type to list of [[RelDataTypeField]].
+    */
+  private def createFieldList(
+      compositeType: CompositeType[_],
+      typeFactory: FlinkTypeFactory)
+    : util.List[RelDataTypeField] = {
+
+    compositeType
+      .getFieldNames
+      .zipWithIndex
+      .map { case (name, index) =>
+        new RelDataTypeFieldImpl(
+          name,
+          index,
+          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
+            .asInstanceOf[RelDataTypeField]
+      }
+      .toList
+      .asJava
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 8528c8a..c45e871 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -55,7 +55,7 @@ import scala.collection.JavaConverters._
   * syntax.
   *
   * @param tableEnv The [[TableEnvironment]] to which the table is bound.
-  * @param logicalPlan
+  * @param logicalPlan logical representation
   */
 class Table(
     private[flink] val tableEnv: TableEnvironment,
@@ -77,7 +77,7 @@ class Table(
     */
   def select(fields: Expression*): Table = {
 
-    val expandedFields = expandProjectList(fields, logicalPlan)
+    val expandedFields = expandProjectList(fields, logicalPlan, tableEnv)
     val (projection, aggs, props) = extractAggregationsAndProperties(expandedFields, tableEnv)
 
     if (props.nonEmpty) {
@@ -549,11 +549,9 @@ class Table(
     * }}}
     */
   def orderBy(fields: Expression*): Table = {
-    val order: Seq[Ordering] = fields.map { case e =>
-      e match {
-        case o: Ordering => o
-        case _ => Asc(e)
-      }
+    val order: Seq[Ordering] = fields.map {
+      case o: Ordering => o
+      case e => Asc(e)
     }
     new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
index 5f50517..1364cbd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/sql/SqlITCase.java
@@ -52,7 +52,7 @@ public class SqlITCase extends TableProgramsTestBase {
 		Table result = tableEnv.sql(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		resultSet.print();
+
 		List<Row> results = resultSet.collect();
 		String expected = "3,World,false,1944-12-24,12.5444444500000000\n" +
 			"2,Hello,true,1944-02-24,12.6666666650000000\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
index 30e44f0..a770a6e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/TableWithSQLITCase.scala
@@ -96,4 +96,22 @@ class TableWithSQLITCase(
     val results = result2.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testSelectWithCompositeType(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT MyTable.a2, MyTable.a1._2 FROM MyTable"
+
+    val ds = env.fromElements(((12, true), "Hello")).toTable(tEnv).as('a1, 'a2)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "Hello,true\n"
+
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
new file mode 100644
index 0000000..f14b9d8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.CompositeFlatteningTest.{TestCaseClass, giveMeCaseClass}
+import org.apache.flink.api.table.functions.ScalarFunction
+import org.apache.flink.api.table.utils.TableTestBase
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+
+class CompositeFlatteningTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testDuplicateFlattening(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+    table.select('a.flatten(), 'a.flatten())
+  }
+
+  @Test
+  def testMultipleFlatteningsTable(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table.select('a.flatten(), 'c, 'b.flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "a._1 AS a$_1",
+        "a._2 AS a$_2",
+        "c",
+        "b._1 AS b$_1",
+        "b._2 AS b$_2"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testMultipleFlatteningsSql(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "a._1 AS _1",
+        "a._2 AS _2",
+        "c",
+        "b._1 AS _10",
+        "b._2 AS _20"
+      )
+    )
+
+    util.verifySql(
+      "SELECT MyTable.a.*, c, MyTable.b.* FROM MyTable",
+      expected)
+  }
+
+  @Test
+  def testNestedFlattenings(): Unit = {
+    val util = batchTestUtil()
+    val table = util
+      .addTable[((((String, TestCaseClass), Boolean), String), String)]("MyTable", 'a, 'b)
+
+    val result = table.select('a.flatten(), 'b.flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "a._1 AS a$_1",
+        "a._2 AS a$_2",
+        "b"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testScalarFunctionAccess(): Unit = {
+    val util = batchTestUtil()
+    val table = util
+      .addTable[(String, Int)]("MyTable", 'a, 'b)
+
+    val result = table.select(
+      giveMeCaseClass().get("my"),
+      giveMeCaseClass().get("clazz"),
+      giveMeCaseClass().flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c0",
+        "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c1",
+        "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c2",
+        "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c3"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}
+
+object CompositeFlatteningTest {
+
+  case class TestCaseClass(my: String, clazz: Int)
+
+  object giveMeCaseClass extends ScalarFunction {
+    def eval(): TestCaseClass = {
+      TestCaseClass("hello", 42)
+    }
+
+    override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+      createTypeInformation[TestCaseClass]
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala
new file mode 100644
index 0000000..3121c58
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, Types, ValidationException}
+import org.apache.flink.api.table.expressions.CompositeAccessTest.{MyCaseClass, MyCaseClass2, MyPojo}
+import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.junit.Test
+
+
+class CompositeAccessTest extends ExpressionTestBase {
+
+  @Test
+  def testGetField(): Unit = {
+
+    // single field by string key
+    testAllApis(
+      'f0.get("intField"),
+      "f0.get('intField')",
+      "testTable.f0.intField",
+      "42")
+
+    testSqlApi("testTable.f0.stringField", "Bob")
+
+    testSqlApi("testTable.f0.booleanField", "true")
+
+    // single field by int key
+    testTableApi(
+      'f0.get(0),
+      "f0.get(0)",
+      "42")
+
+    // nested single field
+    testAllApis(
+      'f1.get("objectField").get("intField"),
+      "f1.get('objectField').get('intField')",
+      "testTable.f1.objectField.intField",
+      "25")
+
+    testSqlApi("testTable.f1.objectField.stringField", "Timo")
+
+    testSqlApi("testTable.f1.objectField.booleanField", "false")
+
+    testAllApis(
+      'f2.get(0),
+      "f2.get(0)",
+      "testTable.f2._1",
+      "a")
+
+    testSqlApi("testTable.f3.f1", "b")
+
+    testSqlApi("testTable.f4.myString", "Hello")
+
+    testSqlApi("testTable.f5", "13")
+
+    testAllApis(
+      'f7.get("_1"),
+      "get(f7, '_1')",
+      "testTable.f7._1",
+      "true")
+
+    // composite field return type
+    testSqlApi("testTable.f6", "MyCaseClass2(null)")
+
+    testAllApis(
+      'f1.get("objectField"),
+      "f1.get('objectField')",
+      "testTable.f1.objectField",
+      "MyCaseClass(25,Timo,false)")
+
+    testAllApis(
+      'f0,
+      "f0",
+      "testTable.f0",
+      "MyCaseClass(42,Bob,true)")
+
+    // flattening (test base only returns first column)
+    testAllApis(
+      'f1.get("objectField").flatten(),
+      "f1.get('objectField').flatten()",
+      "testTable.f1.objectField.*",
+      "25")
+
+    testAllApis(
+      'f0.flatten(),
+      "flatten(f0)",
+      "testTable.f0.*",
+      "42")
+
+    testTableApi(12.flatten(), "12.flatten()", "12")
+
+    testTableApi('f5.flatten(), "f5.flatten()", "13")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWrongSqlField(): Unit = {
+    testSqlApi("testTable.f5.test", "13")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWrongIntKeyField(): Unit = {
+    testTableApi('f0.get(555), "'fail'", "fail")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWrongIntKeyField2(): Unit = {
+    testTableApi("fail", "f0.get(555)", "fail")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWrongStringKeyField(): Unit = {
+    testTableApi('f0.get("fghj"), "'fail'", "fail")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWrongStringKeyField2(): Unit = {
+    testTableApi("fail", "f0.get('fghj')", "fail")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  def testData = {
+    val testData = new Row(8)
+    testData.setField(0, MyCaseClass(42, "Bob", booleanField = true))
+    testData.setField(1, MyCaseClass2(MyCaseClass(25, "Timo", booleanField = false)))
+    testData.setField(2, ("a", "b"))
+    testData.setField(3, new org.apache.flink.api.java.tuple.Tuple2[String, String]("a", "b"))
+    testData.setField(4, new MyPojo())
+    testData.setField(5, 13)
+    testData.setField(6, MyCaseClass2(null))
+    testData.setField(7, Tuple1(true))
+    testData
+  }
+
+  def typeInfo = {
+    new RowTypeInfo(Seq(
+      createTypeInformation[MyCaseClass],
+      createTypeInformation[MyCaseClass2],
+      createTypeInformation[(String, String)],
+      new TupleTypeInfo(Types.STRING, Types.STRING),
+      TypeExtractor.createTypeInfo(classOf[MyPojo]),
+      Types.INT,
+      createTypeInformation[MyCaseClass2],
+      createTypeInformation[Tuple1[Boolean]]
+      )).asInstanceOf[TypeInformation[Any]]
+  }
+
+}
+
+object CompositeAccessTest {
+  case class MyCaseClass(intField: Int, stringField: String, booleanField: Boolean)
+
+  case class MyCaseClass2(objectField: MyCaseClass)
+
+  class MyPojo {
+    private var myInt: Int = 0
+    private var myString: String = "Hello"
+
+    def getMyInt = myInt
+
+    def setMyInt(value: Int) = {
+      myInt = value
+    }
+
+    def getMyString = myString
+
+    def setMyString(value: String) = {
+      myString = myString
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
index cae4388..b892cfb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/SqlExpressionTest.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.Row
 import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
 import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.junit.Test
+import org.junit.{Ignore, Test}
 
 /**
   * Tests all SQL expressions that are currently supported according to the documentation.
@@ -135,6 +135,7 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("CAST(2 AS DOUBLE)", "2.0")
   }
 
+  @Ignore // TODO we need a special code path that flattens ROW types
   @Test
   def testValueConstructorFunctions(): Unit = {
     testSqlApi("ROW('hello world', 12)", "hello world") // test base only returns field 0

http://git-wip-us.apache.org/repos/asf/flink/blob/e4509686/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
index 6720759..d34e335 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.table.expressions.utils
 
-import org.apache.calcite.rel.logical.LogicalProject
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql2rel.RelDecorrelator