You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/12/04 09:06:24 UTC

flink git commit: [FLINK-8104] [table] Add ROW value constructor

Repository: flink
Updated Branches:
  refs/heads/master 4bd0ef1b5 -> e1d656621


[FLINK-8104] [table] Add ROW value constructor

This closes #5040.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1d65662
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1d65662
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1d65662

Branch: refs/heads/master
Commit: e1d656621d8b291087652fee15d577d43d22f2d7
Parents: 4bd0ef1
Author: Rong Rong <ro...@uber.com>
Authored: Mon Nov 20 13:47:50 2017 -0800
Committer: twalthr <tw...@apache.org>
Committed: Mon Dec 4 10:06:03 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |  59 +++++++--
 docs/dev/table/tableApi.md                      |  92 ++++++++++++++
 .../flink/table/api/scala/expressionDsl.scala   |  17 ++-
 .../flink/table/calcite/FlinkTypeFactory.scala  |   8 +-
 .../flink/table/codegen/CodeGenerator.scala     |  34 +++++
 .../table/codegen/calls/ScalarOperators.scala   |  71 ++++++-----
 .../flink/table/expressions/collection.scala    |  31 ++++-
 .../flink/table/validate/FunctionCatalog.scala  |   3 +
 .../flink/table/expressions/RowTypeTest.scala   | 125 +++++++++++++++++++
 .../table/expressions/SqlExpressionTest.scala   |   6 +-
 .../expressions/utils/RowTypeTestBase.scala     |  64 ++++++++++
 .../validation/RowTypeValidationTest.scala      |  42 +++++++
 .../table/runtime/batch/sql/CalcITCase.scala    |  27 ++++
 .../table/runtime/batch/table/CalcITCase.scala  |  24 ++++
 14 files changed, 554 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 50aa933..e5de70a 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -2257,7 +2257,7 @@ tableName.compositeType.*
 <table class="table table-bordered">
   <thead>
     <tr>
-      <th class="text-left" style="width: 40%">Array functions</th>
+      <th class="text-left" style="width: 40%">Value constructor functions</th>
       <th class="text-center">Description</th>
     </tr>
   </thead>
@@ -2267,53 +2267,54 @@ tableName.compositeType.*
     <tr>
       <td>
         {% highlight text %}
-ARRAY ‘[’ value [, value ]* ‘]’
+(value, [, value]*)
 {% endhighlight %}
       </td>
       <td>
-        <p>Creates an array from a list of values.</p>
+        <p>Creates a row from a list of values.</p>
       </td>
     </tr>
 
     <tr>
       <td>
         {% highlight text %}
-CARDINALITY(ARRAY)
+ROW(value, [, value]*)
 {% endhighlight %}
       </td>
       <td>
-        <p>Returns the number of elements of an array.</p>
+        <p>Creates a row from a list of values.</p>
       </td>
     </tr>
 
     <tr>
       <td>
         {% highlight text %}
-array ‘[’ index ‘]’
+ARRAY ‘[’ value [, value ]* ‘]’
 {% endhighlight %}
       </td>
       <td>
-        <p>Returns the element at a particular position in an array. The index starts at 1.</p>
+        <p>Creates an array from a list of values.</p>
       </td>
     </tr>
 
     <tr>
       <td>
         {% highlight text %}
-ELEMENT(ARRAY)
+MAP ‘[’ key, value [, key, value ]* ‘]’
 {% endhighlight %}
       </td>
       <td>
-        <p>Returns the sole element of an array with a single element. Returns <code>null</code> if the array is empty. Throws an exception if the array has more than one element.</p>
+        <p>Creates a map from a list of key-value pairs.</p>
       </td>
     </tr>
+
   </tbody>
 </table>
 
 <table class="table table-bordered">
   <thead>
     <tr>
-      <th class="text-left" style="width: 40%">Map functions</th>
+      <th class="text-left" style="width: 40%">Array functions</th>
       <th class="text-center">Description</th>
     </tr>
   </thead>
@@ -2323,17 +2324,51 @@ ELEMENT(ARRAY)
     <tr>
       <td>
         {% highlight text %}
-MAP ‘[’ key, value [, key, value ]* ‘]’
+CARDINALITY(ARRAY)
 {% endhighlight %}
       </td>
       <td>
-        <p>Creates a map from a list of key-value pairs.</p>
+        <p>Returns the number of elements of an array.</p>
       </td>
     </tr>
 
     <tr>
       <td>
         {% highlight text %}
+array ‘[’ index ‘]’
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the element at a particular position in an array. The index starts at 1.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
+ELEMENT(ARRAY)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the sole element of an array with a single element. Returns <code>null</code> if the array is empty. Throws an exception if the array has more than one element.</p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Map functions</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+
+    <tr>
+      <td>
+        {% highlight text %}
 CARDINALITY(MAP)
 {% endhighlight %}
       </td>

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 498dbbc..8b6fa72 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2996,6 +2996,30 @@ MAP.at(ANY)
 <table class="table table-bordered">
   <thead>
     <tr>
+      <th class="text-left" style="width: 40%">Row functions</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+
+    <tr>
+      <td>
+        {% highlight java %}
+row(ANY, [, ANY]*)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates a row from a list of values. Row is composite type and can be access via <a href="tableApi.html#built-in-functions">value access functions</a>.</p>
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
       <th class="text-left" style="width: 40%">Auxiliary functions</th>
       <th class="text-center">Description</th>
     </tr>
@@ -4287,7 +4311,75 @@ ARRAY.element()
   </tbody>
 </table>
 
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Map functions</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+map(ANY, ANY [, ANY, ANY ]*)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates a map from a list of key-value pairs.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+MAP.cardinality()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the number of entries of a map.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+MAP.at(ANY)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the value specified by a particular key in a map.</p>
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Row functions</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
 
+  <tbody>
+
+    <tr>
+      <td>
+        {% highlight scala %}
+row(ANY, [, ANY]*)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates a row from a list of values. Row is composite type and can be access via <a href="tableApi.html#built-in-functions">value access functions</a>.</p>
+      </td>
+    </tr>
+
+  </tbody>
+</table>
 
 <table class="table table-bordered">
   <thead>

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 2708b5c..d8dbdb6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -960,12 +960,25 @@ object array {
 }
 
 /**
-  * Creates a map of literals. The map will be a map between two objects (not primitives).
+  * Creates a row of expressions.
+  */
+object row {
+
+  /**
+    * Creates a row of expressions.
+    */
+  def apply(head: Expression, tail: Expression*): Expression = {
+    RowConstructor(head +: tail.toSeq)
+  }
+}
+
+/**
+  * Creates a map of expressions. The map will be a map between two objects (not primitives).
   */
 object map {
 
   /**
-    * Creates a map of literals. The map will be a map between two objects (not primitives).
+    * Creates a map of expressions. The map will be a map between two objects (not primitives).
     */
   def apply(key: Expression, value: Expression, tail: Expression*): Expression = {
     MapConstructor(Seq(key, value) ++ tail.toSeq)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 448029b..db7ffdb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -434,8 +434,12 @@ object FlinkTypeFactory {
       val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType]
       compositeRelDataType.compositeType
 
-    // ROW and CURSOR for UDTF case, whose type info will never be used, just a placeholder
-    case ROW | CURSOR => new NothingTypeInfo
+    case ROW if relDataType.isInstanceOf[RelRecordType] =>
+      val relRecordType = relDataType.asInstanceOf[RelRecordType]
+      new RowSchema(relRecordType).typeInfo
+
+    // CURSOR for UDTF case, whose type info will never be used, just a placeholder
+    case CURSOR => new NothingTypeInfo
 
     case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] =>
       val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index b253a27..b98936f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.ROW
 import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo._
@@ -954,6 +955,10 @@ abstract class CodeGenerator(
         requireString(left)
         generateArithmeticOperator("+", nullCheck, resultType, left, right)
 
+      // rows
+      case ROW =>
+        generateRow(this, resultType, operands)
+
       // arrays
       case ARRAY_VALUE_CONSTRUCTOR =>
         generateArray(this, resultType, operands)
@@ -1306,6 +1311,20 @@ abstract class CodeGenerator(
     }
   }
 
+  private[flink] def generateNullableOutputBoxing(
+      element: GeneratedExpression,
+      typeInfo: TypeInformation[_])
+    : GeneratedExpression = {
+    val boxedExpr = generateOutputFieldBoxing(element)
+    val boxedTypeTerm = boxedTypeTermForTypeInfo(typeInfo)
+    val exprOrNull: String = if (nullCheck) {
+      s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
+    } else {
+      boxedExpr.resultTerm
+    }
+    boxedExpr.copy(resultTerm = exprOrNull)
+  }
+
   private[flink] def generateStreamRecordRowtimeAccess(): GeneratedExpression = {
     val resultTerm = newName("result")
     val nullTerm = newName("isNull")
@@ -1555,6 +1574,21 @@ abstract class CodeGenerator(
   }
 
   /**
+    * Adds a reusable [[org.apache.flink.types.Row]]
+    * to the member area of the generated [[Function]].
+    */
+  def addReusableRow(arity: Int): String = {
+    val fieldTerm = newName("row")
+    val fieldRow =
+      s"""
+         |final org.apache.flink.types.Row $fieldTerm =
+         |    new org.apache.flink.types.Row($arity);
+         |""".stripMargin
+    reusableMemberStatements.add(fieldRow)
+    fieldTerm
+  }
+
+  /**
     * Adds a reusable array to the member area of the generated [[Function]].
     */
   def addReusableArray(clazz: Class[_], size: Int): String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 7fb8eba..b7e02ae 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -22,7 +22,8 @@ import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
 import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo._
-import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
 import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
@@ -849,6 +850,41 @@ object ScalarOperators {
     generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, operand)
   }
 
+  def generateRow(
+      codeGenerator: CodeGenerator,
+      resultType: TypeInformation[_],
+      elements: Seq[GeneratedExpression])
+  : GeneratedExpression = {
+    val rowTerm = codeGenerator.addReusableRow(resultType.getArity)
+
+    val boxedElements: Seq[GeneratedExpression] = resultType match {
+      case ct: RowTypeInfo => // should always be RowTypeInfo
+        if (resultType.getArity == elements.size) {
+          elements.zipWithIndex.map {
+            case (e, idx) => codeGenerator.generateNullableOutputBoxing(e,
+              ct.getTypeAt(idx))
+          }
+        } else {
+          throw new CodeGenException(s"Illegal row generation operation. " +
+            s"Expected row arity ${resultType.getArity} but was ${elements.size}.")
+        }
+      case _ => throw new CodeGenException(s"Unsupported row generation operation. " +
+        s"Expected RowTypeInfo but was $resultType.")
+    }
+
+    val code = boxedElements
+      .zipWithIndex
+      .map { case (element, idx) =>
+        s"""
+           |${element.code}
+           |$rowTerm.setField($idx, ${element.resultTerm});
+           |""".stripMargin
+      }
+      .mkString("\n")
+
+    GeneratedExpression(rowTerm, GeneratedExpression.NEVER_NULL, code, resultType)
+  }
+
   def generateArray(
       codeGenerator: CodeGenerator,
       resultType: TypeInformation[_],
@@ -857,21 +893,11 @@ object ScalarOperators {
     val arrayTerm = codeGenerator.addReusableArray(resultType.getTypeClass, elements.size)
 
     val boxedElements: Seq[GeneratedExpression] = resultType match {
-
+      // we box the elements to also represent null values
       case oati: ObjectArrayTypeInfo[_, _] =>
-        // we box the elements to also represent null values
-        val boxedTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
-
         elements.map { e =>
-          val boxedExpr = codeGenerator.generateOutputFieldBoxing(e)
-          val exprOrNull: String = if (codeGenerator.nullCheck) {
-            s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
-          } else {
-            boxedExpr.resultTerm
-          }
-          boxedExpr.copy(resultTerm = exprOrNull)
+          codeGenerator.generateNullableOutputBoxing(e, oati.getComponentInfo)
         }
-
       // no boxing necessary
       case _: PrimitiveArrayTypeInfo[_] => elements
     }
@@ -1085,22 +1111,9 @@ object ScalarOperators {
 
     val boxedElements: Seq[GeneratedExpression] = resultType match {
       case mti: MapTypeInfo[_, _] =>
-        // we box the elements to also represent null values
-        val boxedKeyTypeTerm = boxedTypeTermForTypeInfo(mti.getKeyTypeInfo)
-        val boxedValueTypeTerm = boxedTypeTermForTypeInfo(mti.getValueTypeInfo)
-
-        elements.zipWithIndex.map { case (element, idx) =>
-          val boxedExpr = codeGenerator.generateOutputFieldBoxing(element)
-          val exprOrNull: String = if (codeGenerator.nullCheck) {
-            if (idx % 2 == 0) {
-              s"${boxedExpr.nullTerm} ? null : ($boxedKeyTypeTerm) ${boxedExpr.resultTerm}"
-            } else {
-              s"${boxedExpr.nullTerm} ? null : ($boxedValueTypeTerm) ${boxedExpr.resultTerm}"
-            }
-          } else {
-            boxedExpr.resultTerm
-          }
-          boxedExpr.copy(resultTerm = exprOrNull)
+        elements.zipWithIndex.map { case (e, idx) =>
+          codeGenerator.generateNullableOutputBoxing(e,
+            if (idx % 2 == 0) mti.getKeyTypeInfo else mti.getValueTypeInfo)
         }
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
index 3b65ee4..951ae27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
@@ -23,13 +23,42 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
 import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo, ObjectArrayTypeInfo}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
 import org.apache.flink.table.calcite.FlinkRelBuilder
 import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isMap}
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
 import scala.collection.JavaConverters._
 
+case class RowConstructor(elements: Seq[Expression]) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = elements
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val relDataType = relBuilder
+      .asInstanceOf[FlinkRelBuilder]
+      .getTypeFactory
+      .createTypeFromTypeInfo(resultType, isNullable = false)
+    val values = elements.map(_.toRexNode).toList.asJava
+    relBuilder
+      .getRexBuilder
+      .makeCall(relDataType, SqlStdOperatorTable.ROW, values)
+  }
+
+  override def toString = s"row(${elements.mkString(", ")})"
+
+  override private[flink] def resultType: TypeInformation[_] = new RowTypeInfo(
+    elements.map(e => e.resultType):_*
+  )
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (elements.isEmpty) {
+      return ValidationFailure("Empty rows are not supported yet.")
+    }
+    ValidationSuccess
+  }
+}
+
 case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
 
   override private[flink] def children: Seq[Expression] = elements

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 120bf54..281633e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -250,6 +250,9 @@ object FunctionCatalog {
     // map
     "map" -> classOf[MapConstructor],
 
+    // row
+    "row" -> classOf[RowConstructor],
+
     // window properties
     "start" -> classOf[WindowStart],
     "end" -> classOf[WindowEnd],

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
new file mode 100644
index 0000000..abe3ae2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.sql.Date
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.RowTypeTestBase
+import org.junit.Test
+
+class RowTypeTest extends RowTypeTestBase {
+
+  @Test
+  def testRowLiteral(): Unit = {
+
+    // primitive literal
+    testAllApis(
+      row(1, "foo", true),
+      "row(1, 'foo', true)",
+      "ROW(1, 'foo', true)",
+      "1,foo,true")
+
+    // special literal
+    testAllApis(
+      row(Date.valueOf("1985-04-11"),
+        BigDecimal("0.1").bigDecimal,
+        array(1, 2, 3),
+        map("foo", "bar"),
+        row(1, true)
+      ),
+      "row('1985-04-11'.toDate, 0.1p, Array(1, 2, 3), " +
+        "Map('foo', 'bar'), row(1, true))",
+      "ROW(DATE '1985-04-11', CAST(0.1 AS DECIMAL), ARRAY[1, 2, 3], " +
+        "MAP['foo', 'bar'], row(1, true))",
+      "1985-04-11,0.1,[1, 2, 3],{foo=bar},1,true") // string faltten
+
+    testAllApis(
+      row(1 + 1, 2 * 3, Null(Types.STRING)),
+      "row(1 + 1, 2 * 3, Null(STRING))",
+      "ROW(1 + 1, 2 * 3, NULLIF(1,1))",
+      "2,6,null"
+    )
+
+    testSqlApi("(1, 'foo', true)", "1,foo,true")
+  }
+
+  @Test
+  def testRowField(): Unit = {
+    testAllApis(
+      row('f0, 'f1),
+      "row(f0, f1)",
+      "(f0, f1)",
+      "null,1"
+    )
+
+    testAllApis(
+      'f2,
+      "f2",
+      "f2",
+      "2,foo,true"
+    )
+
+    testAllApis(
+      row('f2, 'f5),
+      "row(f2, f5)",
+      "(f2, f5)",
+      "2,foo,true,foo,null"
+    )
+
+    testAllApis(
+      'f4,
+      "f4",
+      "f4",
+      "1984-03-12,0E-8,[1, 2, 3]"
+    )
+
+    testAllApis(
+      row('f1, "foo", true),
+      "row(f1, 'foo', true)",
+      "(f1, 'foo',true)",
+      "1,foo,true"
+    )
+  }
+
+  @Test
+  def testRowOperations(): Unit = {
+    testAllApis(
+      'f5.get("f0"),
+      "f5.get('f0')",
+      "f5.f0",
+      "foo"
+    )
+
+    testAllApis(
+      'f3.get("f1").get("f2"),
+      "f3.get('f1').get('f2')",
+      "f3.f1.f2",
+      "true"
+    )
+
+    // SQL API for row value constructor follow by field access is not supported
+    testTableApi(
+      row('f1, 'f6, 'f2).get("f1").get("f1"),
+      "row(f1, f6, f2).get('f1').get('f1')",
+      "null"
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 7bdfee0..e6bfcdc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -151,9 +151,9 @@ class SqlExpressionTest extends ExpressionTestBase {
 
   @Test
   def testValueConstructorFunctions(): Unit = {
-    // TODO we need a special code path that flattens ROW types
-    // testSqlApi("ROW('hello world', 12)", "hello world") // test base only returns field 0
-    // testSqlApi("('hello world', 12)", "hello world") // test base only returns field 0
+    testSqlApi("ROW('hello world', 12)", "hello world,12")
+    testSqlApi("('hello world', 12)", "hello world,12")
+    testSqlApi("('foo', ('bar', 12))", "foo,bar,12")
     testSqlApi("ARRAY[TRUE, FALSE][2]", "false")
     testSqlApi("ARRAY[TRUE, TRUE]", "[true, true]")
     testSqlApi("MAP['k1', 'v1', 'k2', 'v2']['k2']", "v2")

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala
new file mode 100644
index 0000000..f8f10a6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/RowTypeTestBase.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.utils
+
+import java.sql.Date
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo}
+import org.apache.flink.table.api.Types
+import org.apache.flink.types.Row
+
+class RowTypeTestBase extends ExpressionTestBase {
+
+  override def testData: Any = {
+    val row = new Row(3)
+    row.setField(0, 2)
+    row.setField(1, "foo")
+    row.setField(2, true)
+    val nestedRow = new Row(2)
+    nestedRow.setField(0, 3)
+    nestedRow.setField(1, row)
+    val specialTypeRow = new Row(3)
+    specialTypeRow.setField(0, Date.valueOf("1984-03-12"))
+    specialTypeRow.setField(1, BigDecimal("0.00000000").bigDecimal)
+    specialTypeRow.setField(2, Array(1, 2, 3))
+    val testData = new Row(7)
+    testData.setField(0, null)
+    testData.setField(1, 1)
+    testData.setField(2, row)
+    testData.setField(3, nestedRow)
+    testData.setField(4, specialTypeRow)
+    testData.setField(5, Row.of("foo", null))
+    testData.setField(6, Row.of(null, null))
+    testData
+  }
+
+  override def typeInfo: TypeInformation[Any] = {
+    new RowTypeInfo(
+      Types.STRING,
+      Types.INT,
+      Types.ROW(Types.INT, Types.STRING, Types.BOOLEAN),
+      Types.ROW(Types.INT, Types.ROW(Types.INT, Types.STRING, Types.BOOLEAN)),
+      Types.ROW(Types.SQL_DATE, Types.DECIMAL, ObjectArrayTypeInfo.getInfoFor(Types.INT)),
+      Types.ROW(Types.STRING, Types.BOOLEAN),
+      Types.ROW(Types.STRING, Types.STRING)
+    ).asInstanceOf[TypeInformation[Any]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/RowTypeValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/RowTypeValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/RowTypeValidationTest.scala
new file mode 100644
index 0000000..94e8394
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/RowTypeValidationTest.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.validation
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.RowTypeTestBase
+import org.junit.Test
+
+class RowTypeValidationTest extends RowTypeTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testEmptyRowType(): Unit = {
+    testAllApis("FAIL", "row()", "Row()", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNullRowType(): Unit = {
+    testAllApis("FAIL", "row(null)", "Row(NULL)", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSqlRowIllegalAccess(): Unit = {
+    testAllApis('f5.get("f2"), "f5.get('f2')", "f5.f2", "FAIL")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 71df4e6..3529b5f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -32,6 +32,7 @@ import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, Ta
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit._
+import org.junit.Assert.assertEquals
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
@@ -309,6 +310,32 @@ class CalcITCase(
   }
 
   @Test
+  def testValueConstructor(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT (a, b, c), ARRAY[12, b], MAP[a, c] FROM MyTable " +
+      "WHERE (a, b, c) = ('foo', 12, TIMESTAMP '1984-07-12 14:34:24')"
+    val rowValue = ("foo", 12, Timestamp.valueOf("1984-07-12 14:34:24"))
+
+    val ds = env.fromElements(rowValue)
+    tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+    val result = tEnv.sqlQuery(sqlQuery)
+
+    val expected = "foo,12,1984-07-12 14:34:24.0,[12, 12],{foo=1984-07-12 14:34:24.0}"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+
+    // Compare actual object to avoid undetected Calcite flattening
+    val resultRow = results.asJava.get(0)
+    assertEquals(rowValue._1, resultRow.getField(0).asInstanceOf[Row].getField(0))
+    assertEquals(rowValue._2, resultRow.getField(1).asInstanceOf[Array[Integer]](1))
+    assertEquals(rowValue._3,
+      resultRow.getField(2).asInstanceOf[util.Map[String, Timestamp]].get(rowValue._1))
+  }
+
+  @Test
   def testUserDefinedScalarFunction(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1d65662/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 22373d2..5ef7e31 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -36,6 +36,7 @@ import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
 import org.apache.flink.types.Row
 import org.junit._
+import org.junit.Assert.assertEquals
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
@@ -471,6 +472,29 @@ class CalcITCase(
   }
 
   @Test
+  def testValueConstructor(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val rowValue = ("foo", 12, Timestamp.valueOf("1984-07-12 14:34:24"))
+
+    val table = env.fromElements(rowValue).toTable(tEnv, 'a, 'b, 'c)
+
+    val result = table.select(row('a, 'b, 'c), array(12, 'b), map('a, 'c))
+
+    val expected = "foo,12,1984-07-12 14:34:24.0,[12, 12],{foo=1984-07-12 14:34:24.0}"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+
+    // Compare actual object to avoid undetected Calcite flattening
+    val resultRow = results.asJava.get(0)
+    assertEquals(rowValue._1, resultRow.getField(0).asInstanceOf[Row].getField(0))
+    assertEquals(rowValue._2, resultRow.getField(1).asInstanceOf[Array[Integer]](1))
+    assertEquals(rowValue._3,
+      resultRow.getField(2).asInstanceOf[util.Map[String, Timestamp]].get(rowValue._1))
+  }
+
+  @Test
   def testMultipleUserDefinedScalarFunctions(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)