You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/21 16:09:50 UTC

[2/2] flink git commit: [FLINK-8038] [table] Support map value constructor, cardinality, and item

[FLINK-8038] [table] Support map value constructor, cardinality, and item

This closes #5015.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5f5615c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5f5615c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5f5615c

Branch: refs/heads/master
Commit: c5f5615cf84026039614701b5e6b3b0e003eada0
Parents: 52599ff
Author: Rong Rong <ro...@uber.com>
Authored: Tue Nov 14 10:48:16 2017 -0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Nov 21 17:09:02 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sql.md                           |  45 +++++
 docs/dev/table/tableApi.md                      |  48 ++++-
 .../flink/table/api/scala/expressionDsl.scala   |  25 ++-
 .../flink/table/calcite/FlinkTypeFactory.scala  |  11 ++
 .../flink/table/codegen/CodeGenUtils.scala      |   5 +
 .../flink/table/codegen/CodeGenerator.scala     |  38 +++-
 .../table/codegen/calls/ScalarOperators.scala   |  64 +++++++
 .../table/expressions/ExpressionParser.scala    |   2 +
 .../table/expressions/ExpressionUtils.scala     |   4 +
 .../apache/flink/table/expressions/array.scala  |  60 -------
 .../flink/table/expressions/cardinality.scala   |  50 ++++++
 .../apache/flink/table/expressions/item.scala   |  76 ++++++++
 .../apache/flink/table/expressions/map.scala    |  76 ++++++++
 .../flink/table/plan/ProjectionTranslator.scala |   7 +
 .../flink/table/typeutils/TypeCheckUtils.scala  |   5 +-
 .../flink/table/validate/FunctionCatalog.scala  |  16 +-
 .../flink/table/expressions/ArrayTypeTest.scala |   9 +
 .../flink/table/expressions/MapTypeTest.scala   | 173 ++++++++++++++++++-
 .../table/expressions/SqlExpressionTest.scala   |   2 +
 .../expressions/utils/MapTypeTestBase.scala     |  23 ++-
 .../validation/MapTypeValidationTest.scala      |   2 +-
 21 files changed, 652 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 3097d9e..50aa933 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -2310,6 +2310,51 @@ ELEMENT(ARRAY)
   </tbody>
 </table>
 
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Map functions</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+
+    <tr>
+      <td>
+        {% highlight text %}
+MAP ‘[’ key, value [, key, value ]* ‘]’
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates a map from a list of key-value pairs.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
+CARDINALITY(MAP)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the number of entries of a map.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight text %}
+map ‘[’ key ‘]’
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the value specified by a particular key in a map.</p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
 ### Unsupported Functions
 
 The following functions are not supported yet:

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f5a2059..498dbbc 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1610,7 +1610,7 @@ rowInterval = composite , "." , "rows" ;
 
 cast = composite , ".cast(" , dataType , ")" ;
 
-dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
+dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "MAP" , "(" , dataType , "," , dataType , ")" ) | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
 
 as = composite , ".as(" , fieldReference , ")" ;
 
@@ -2950,6 +2950,52 @@ ARRAY.element()
 <table class="table table-bordered">
   <thead>
     <tr>
+      <th class="text-left" style="width: 40%">Map functions</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+
+    <tr>
+      <td>
+        {% highlight java %}
+map(ANY, ANY [, ANY, ANY ]*)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Creates a map from a list of key-value pairs.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+MAP.cardinality()
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the number of entries of a map.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        {% highlight java %}
+MAP.at(ANY)
+{% endhighlight %}
+      </td>
+      <td>
+        <p>Returns the value specified by a particular key in a map.</p>
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
       <th class="text-left" style="width: 40%">Auxiliary functions</th>
       <th class="text-center">Description</th>
     </tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index b62e142..72a5561 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -683,19 +683,19 @@ trait ImplicitExpressionOperations {
   def flatten() = Flattening(expr)
 
   /**
-    * Accesses the element of an array based on an index (starting at 1).
+    * Accesses the element of an array or map based on a key or an index (starting at 1).
     *
-    * @param index position of the element (starting at 1)
+    * @param index key or position of the element (array index starting at 1)
     * @return value of the element
     */
-  def at(index: Expression) = ArrayElementAt(expr, index)
+  def at(index: Expression) = ItemAt(expr, index)
 
   /**
-    * Returns the number of elements of an array.
+    * Returns the number of elements of an array or number of entries of a map.
     *
-    * @return number of elements
+    * @return number of elements or entries
     */
-  def cardinality() = ArrayCardinality(expr)
+  def cardinality() = Cardinality(expr)
 
   /**
     * Returns the sole element of an array with a single element. Returns null if the array is
@@ -960,6 +960,19 @@ object array {
 }
 
 /**
+  * Creates a map of literals. The map will be a map between two objects (not primitives).
+  */
+object map {
+
+  /**
+    * Creates a map of literals. The map will be a map between two objects (not primitives).
+    */
+  def apply(key: Expression, value: Expression, tail: Expression*): Expression = {
+    MapConstructor(Seq(key, value) ++ tail.toSeq)
+  }
+}
+
+/**
   * Returns a value that is closer than any other value to pi.
   */
 object pi {

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 7bcdc0f..448029b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -222,6 +222,17 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     canonize(relType)
   }
 
+  override def createMapType(keyType: RelDataType, valueType: RelDataType): RelDataType = {
+    val relType = new MapRelDataType(
+      new MapTypeInfo(
+        FlinkTypeFactory.toTypeInfo(keyType),
+        FlinkTypeFactory.toTypeInfo(valueType)),
+      keyType,
+      valueType,
+      isNullable = false)
+    this.canonize(relType)
+  }
+
   override def createMultisetType(elementType: RelDataType, maxCardinality: Long): RelDataType = {
     val relType = new MultisetRelDataType(
       MultisetTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index 161f9a3..d4ba902 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -198,6 +198,11 @@ object CodeGenUtils {
       throw new CodeGenException("Array expression type expected.")
     }
 
+  def requireMap(genExpr: GeneratedExpression): Unit =
+    if (!TypeCheckUtils.isMap(genExpr.resultType)) {
+      throw new CodeGenException("Map expression type expected.")
+    }
+
   def requireInteger(genExpr: GeneratedExpression): Unit =
     if (!TypeCheckUtils.isInteger(genExpr.resultType)) {
       throw new CodeGenException("Integer expression type expected.")

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index a794b08..b51cdbe 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -957,6 +957,10 @@ abstract class CodeGenerator(
       case ARRAY_VALUE_CONSTRUCTOR =>
         generateArray(this, resultType, operands)
 
+      // maps
+      case MAP_VALUE_CONSTRUCTOR =>
+        generateMap(this, resultType, operands)
+
       case ITEM =>
         operands.head.resultType match {
           case _: ObjectArrayTypeInfo[_, _] |
@@ -975,9 +979,21 @@ abstract class CodeGenerator(
         }
 
       case CARDINALITY =>
-        val array = operands.head
-        requireArray(array)
-        generateArrayCardinality(nullCheck, array)
+        operands.head.resultType match {
+          case _: ObjectArrayTypeInfo[_, _] |
+               _: BasicArrayTypeInfo[_, _] |
+               _: PrimitiveArrayTypeInfo[_] =>
+            val array = operands.head
+            requireArray(array)
+            generateArrayCardinality(nullCheck, array)
+
+          case _: MapTypeInfo[_, _] =>
+            val map = operands.head
+            requireMap(map)
+            generateMapCardinality(nullCheck, map)
+
+          case _ => throw new CodeGenException("Expect an array or a map.")
+        }
 
       case ELEMENT =>
         val array = operands.head
@@ -1562,6 +1578,22 @@ abstract class CodeGenerator(
   }
 
   /**
+    * Adds a reusable hash map to the member area of the generated [[Function]].
+    */
+  def addReusableMap(clazz: Class[_]): String = {
+    val fieldTerm = newName("map")
+    val classQualifier = "java.util.Map"
+    val initMap = "java.util.HashMap()"
+    val fieldMap =
+      s"""
+         |final $classQualifier $fieldTerm =
+         |    new $initMap;
+         |""".stripMargin
+    reusableMemberStatements.add(fieldMap)
+    fieldTerm
+  }
+
+  /**
     * Adds a reusable timestamp to the beginning of the SAM of the generated [[Function]].
     */
   def addReusableTimestamp(): String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index bd5b1f7..522d826 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -188,6 +188,13 @@ object ScalarOperators {
         (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)"
       }
     }
+    // map types
+    else if (isMap(left.resultType) &&
+      left.resultType.getTypeClass == right.resultType.getTypeClass) {
+      generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+        (leftTerm, rightTerm) => s"java.util.Map.equals($leftTerm, $rightTerm)"
+      }
+    }
     // comparable types of same type
     else if (isComparable(left.resultType) && left.resultType == right.resultType) {
       generateComparison("==", nullCheck, left, right)
@@ -229,6 +236,13 @@ object ScalarOperators {
         (leftTerm, rightTerm) => s"!java.util.Arrays.equals($leftTerm, $rightTerm)"
       }
     }
+    // map types
+    else if (isMap(left.resultType) &&
+      left.resultType.getTypeClass == right.resultType.getTypeClass) {
+      generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+        (leftTerm, rightTerm) => s"!java.util.Map.equals($leftTerm, $rightTerm)"
+      }
+    }
     // comparable types
     else if (isComparable(left.resultType) && left.resultType == right.resultType) {
       generateComparison("!=", nullCheck, left, right)
@@ -1061,6 +1075,47 @@ object ScalarOperators {
     GeneratedExpression(resultTerm, nullTerm, operatorCode, Types.STRING)
   }
 
+  def generateMap(
+      codeGenerator: CodeGenerator,
+      resultType: TypeInformation[_],
+      elements: Seq[GeneratedExpression])
+  : GeneratedExpression = {
+    val mapTerm = codeGenerator.addReusableMap(resultType.getTypeClass)
+
+    val boxedElements: Seq[GeneratedExpression] = resultType match {
+      case mti: MapTypeInfo[_, _] =>
+        // we box the elements to also represent null values
+        val boxedKeyTypeTerm = boxedTypeTermForTypeInfo(mti.getKeyTypeInfo)
+        val boxedValueTypeTerm = boxedTypeTermForTypeInfo(mti.getValueTypeInfo)
+
+        elements.zipWithIndex.map { case (element, idx) =>
+          val boxedExpr = codeGenerator.generateOutputFieldBoxing(element)
+          val exprOrNull: String = if (codeGenerator.nullCheck) {
+            if (idx % 2 == 0) {
+              s"${boxedExpr.nullTerm} ? null : ($boxedKeyTypeTerm) ${boxedExpr.resultTerm}"
+            } else {
+              s"${boxedExpr.nullTerm} ? null : ($boxedValueTypeTerm) ${boxedExpr.resultTerm}"
+            }
+          } else {
+            boxedExpr.resultTerm
+          }
+          boxedExpr.copy(resultTerm = exprOrNull)
+        }
+    }
+
+    val code = boxedElements.grouped(2)
+      .map { case Seq(key, value) =>
+        s"""
+           |${key.code}
+           |${value.code}
+           |$mapTerm.put(${key.resultTerm}, ${value.resultTerm});
+           |""".stripMargin
+      }
+      .mkString("\n")
+
+    GeneratedExpression(mapTerm, GeneratedExpression.NEVER_NULL, code, resultType)
+  }
+
   def generateMapGet(
       codeGenerator: CodeGenerator,
       map: GeneratedExpression,
@@ -1091,6 +1146,15 @@ object ScalarOperators {
     GeneratedExpression(resultTerm, nullTerm, accessCode, resultType)
   }
 
+  def generateMapCardinality(
+      nullCheck: Boolean,
+      map: GeneratedExpression)
+  : GeneratedExpression = {
+    generateUnaryOperatorIfNotNull(nullCheck, INT_TYPE_INFO, map) {
+      (operandTerm) => s"${map.resultTerm}.size"
+    }
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   private def generateUnaryOperatorIfNotNull(

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index 201679b..aa82464 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -86,6 +86,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val FALSE: Keyword = Keyword("false")
   lazy val PRIMITIVE_ARRAY: Keyword = Keyword("PRIMITIVE_ARRAY")
   lazy val OBJECT_ARRAY: Keyword = Keyword("OBJECT_ARRAY")
+  lazy val MAP: Keyword = Keyword("MAP")
   lazy val BYTE: Keyword = Keyword("BYTE")
   lazy val SHORT: Keyword = Keyword("SHORT")
   lazy val INTERVAL_MONTHS: Keyword = Keyword("INTERVAL_MONTHS")
@@ -141,6 +142,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   lazy val dataType: PackratParser[TypeInformation[_]] =
     PRIMITIVE_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => Types.PRIMITIVE_ARRAY(ct) } |
     OBJECT_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => Types.OBJECT_ARRAY(ct) } |
+    MAP ~ "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ { mt => Types.MAP(mt._1._1, mt._2)} |
     BYTE ^^ { e => Types.BYTE } |
     SHORT ^^ { e => Types.SHORT } |
     INTERVAL_MONTHS ^^ { e => Types.INTERVAL_MONTHS } |

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
index 08abc8f..3b52ab4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -138,6 +138,10 @@ object ExpressionUtils {
     }
   }
 
+  private[flink] def convertMap(map: Map[Expression, Expression]): Expression = {
+    MapConstructor(map.flatMap(entry => Seq(entry._1, entry._2)).toSeq)
+  }
+
   // ----------------------------------------------------------------------------------------------
   // RexNode conversion functions (see org.apache.calcite.sql2rel.StandardConvertletTable)
   // ----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
index 3288478..c43bddd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
@@ -62,66 +62,6 @@ case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
   }
 }
 
-case class ArrayElementAt(array: Expression, index: Expression) extends Expression {
-
-  override private[flink] def children: Seq[Expression] = Seq(array, index)
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder
-      .getRexBuilder
-      .makeCall(SqlStdOperatorTable.ITEM, array.toRexNode, index.toRexNode)
-  }
-
-  override def toString = s"($array).at($index)"
-
-  override private[flink] def resultType = array.resultType match {
-    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
-    case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
-    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
-  }
-
-  override private[flink] def validateInput(): ValidationResult = {
-    array.resultType match {
-      case ati: TypeInformation[_] if isArray(ati)  =>
-        if (index.resultType == INT_TYPE_INFO) {
-          // check for common user mistake
-          index match {
-            case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
-              ValidationFailure(
-                s"Array element access needs an index starting at 1 but was $value.")
-            case _ => ValidationSuccess
-          }
-        } else {
-          ValidationFailure(
-            s"Array element access needs an integer index but was '${index.resultType}'.")
-        }
-      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
-    }
-  }
-}
-
-case class ArrayCardinality(array: Expression) extends Expression {
-
-  override private[flink] def children: Seq[Expression] = Seq(array)
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder
-      .getRexBuilder
-      .makeCall(SqlStdOperatorTable.CARDINALITY, array.toRexNode)
-  }
-
-  override def toString = s"($array).cardinality()"
-
-  override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    array.resultType match {
-      case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
-      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
-    }
-  }
-}
-
 case class ArrayElement(array: Expression) extends Expression {
 
   override private[flink] def children: Seq[Expression] = Seq(array)

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
new file mode 100644
index 0000000..aaf52b0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isMap}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+case class Cardinality(container: Expression) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = Seq(container)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(SqlStdOperatorTable.CARDINALITY, container.toRexNode)
+  }
+
+  override def toString = s"($container).cardinality()"
+
+  override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    container.resultType match {
+      case mti: TypeInformation[_] if isMap(mti) => ValidationSuccess
+      case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
+      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
new file mode 100644
index 0000000..75a1224
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
+import org.apache.flink.table.typeutils.TypeCheckUtils.isArray
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+case class ItemAt(container: Expression, key: Expression) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = Seq(container, key)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(SqlStdOperatorTable.ITEM, container.toRexNode, key.toRexNode)
+  }
+
+  override def toString = s"($container).at($key)"
+
+  override private[flink] def resultType = container.resultType match {
+    case mti: MapTypeInfo[_, _] => mti.getValueTypeInfo
+    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+    case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
+    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    container.resultType match {
+      case ati: TypeInformation[_] if isArray(ati)  =>
+        if (key.resultType == INT_TYPE_INFO) {
+          // check for common user mistake
+          key match {
+            case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
+              ValidationFailure(
+                s"Array element access needs an index starting at 1 but was $value.")
+            case _ => ValidationSuccess
+          }
+        } else {
+          ValidationFailure(
+            s"Array element access needs an integer index but was '${key.resultType}'.")
+        }
+      case mti: MapTypeInfo[_, _]  =>
+        if (key.resultType == mti.getKeyTypeInfo) {
+          ValidationSuccess
+        } else {
+          ValidationFailure(
+            s"Map key-value access needs a valid key of type " +
+              s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.")
+        }
+      case other@_ => ValidationFailure(s"Array or map expected but was '$other'.")
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
new file mode 100644
index 0000000..bf71401
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.MapRelDataType
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class MapConstructor(elements: Seq[Expression]) extends Expression {
+  override private[flink] def children: Seq[Expression] = elements
+
+  private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
+    new GenericTypeInfo[AnyRef](classOf[AnyRef]),
+    new GenericTypeInfo[AnyRef](classOf[AnyRef]))
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+    val relDataType = typeFactory.createMapType(
+      typeFactory.createTypeFromTypeInfo(elements.head.resultType, isNullable = true),
+      typeFactory.createTypeFromTypeInfo(elements.last.resultType, isNullable = true)
+    )
+    val values = elements.map(_.toRexNode).toList.asJava
+    relBuilder
+      .getRexBuilder
+      .makeCall(relDataType, SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, values)
+  }
+
+  override def toString = s"map(${elements
+    .grouped(2)
+    .map(x => s"[${x.mkString(": ")}]").mkString(", ")})"
+
+  override private[flink] def resultType: TypeInformation[_] = new MapTypeInfo(
+    elements.head.resultType,
+    elements.last.resultType
+  )
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (elements.isEmpty) {
+      return ValidationFailure("Empty maps are not supported yet.")
+    }
+    if (elements.size % 2 != 0) {
+      return ValidationFailure("Maps must have even number of elements to form key value pairs.")
+    }
+    if (!elements.grouped(2).forall(_.head.resultType == elements.head.resultType)) {
+      return ValidationFailure("Not all key elements of the map literal have the same type.")
+    }
+    if (!elements.grouped(2).forall(_.last.resultType == elements.last.resultType)) {
+      return ValidationFailure("Not all value elements of the map literal have the same type.")
+    }
+    ValidationSuccess
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
index dfb44b1..97cd9cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
@@ -174,6 +174,13 @@ object ProjectionTranslator {
             replaceAggregationsAndProperties(exp, tableEnv, aggNames, propNames, projectedNames))
         c.makeCopy(Array(newArgs))
 
+      // map constructor
+      case c @ MapConstructor(args) =>
+        val newArgs = c.elements
+          .map((exp: Expression) =>
+            replaceAggregationsAndProperties(exp, tableEnv, aggNames, propNames, projectedNames))
+        c.makeCopy(Array(newArgs))
+
       // General expression
       case e: Expression =>
         val newArgs = e.productIterator.map {

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 00627ad..fa7abab 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.typeutils
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo._
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo.{INTERVAL_MILLIS, INTERVAL_MONTHS}
 import org.apache.flink.table.validate._
 
@@ -79,6 +79,9 @@ object TypeCheckUtils {
     case _ => false
   }
 
+  def isMap(dataType: TypeInformation[_]): Boolean =
+    dataType.isInstanceOf[MapTypeInfo[_, _]]
+
   def isComparable(dataType: TypeInformation[_]): Boolean =
     classOf[Comparable[_]].isAssignableFrom(dataType.getTypeClass) && !isArray(dataType)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 535fd6e..120bf54 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -237,12 +237,19 @@ object FunctionCatalog {
     "dateTimePlus" -> classOf[Plus],
     "dateFormat" -> classOf[DateFormat],
 
+    // item
+    "at" -> classOf[ItemAt],
+
+    // cardinality
+    "cardinality" -> classOf[Cardinality],
+
     // array
     "array" -> classOf[ArrayConstructor],
-    "cardinality" -> classOf[ArrayCardinality],
-    "at" -> classOf[ArrayElementAt],
     "element" -> classOf[ArrayElement],
 
+    // map
+    "map" -> classOf[MapConstructor],
+
     // window properties
     "start" -> classOf[WindowStart],
     "end" -> classOf[WindowEnd],
@@ -330,9 +337,12 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
     SqlStdOperatorTable.VAR_SAMP,
     // ARRAY OPERATORS
     SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR,
+    SqlStdOperatorTable.ELEMENT,
+    // MAP OPERATORS
+    SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR,
+    // ARRAY MAP SHARED OPERATORS
     SqlStdOperatorTable.ITEM,
     SqlStdOperatorTable.CARDINALITY,
-    SqlStdOperatorTable.ELEMENT,
     // SPECIAL OPERATORS
     SqlStdOperatorTable.ROW,
     SqlStdOperatorTable.OVERLAPS,

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
index 536f6ba..28023dd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
@@ -295,4 +295,13 @@ class ArrayTypeTest extends ArrayTypeTestBase {
       "f11 <> f9",
       "false")
   }
+
+  @Test
+  def testArrayTypeCasting(): Unit = {
+    testTableApi(
+      'f3.cast(Types.OBJECT_ARRAY(Types.SQL_DATE)),
+      "f3.cast(OBJECT_ARRAY(SQL_DATE))",
+      "[1984-03-12, 1984-02-10]"
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
index b2b8016..a2f2a0b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
@@ -18,17 +18,178 @@
 
 package org.apache.flink.table.expressions
 
+import java.sql.Date
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils.MapTypeTestBase
 import org.junit.Test
 
 class MapTypeTest extends MapTypeTestBase {
 
   @Test
-  def testItem(): Unit = {
-    testSqlApi("f0['map is null']", "null")
-    testSqlApi("f1['map is empty']", "null")
-    testSqlApi("f2['b']", "13")
-    testSqlApi("f3[1]", "null")
-    testSqlApi("f3[12]", "a")
+  def testMapLiteral(): Unit = {
+    // primitive literals
+    testAllApis(map(1, 1), "map(1, 1)", "MAP[1, 1]", "{1=1}")
+
+    testAllApis(
+      map(true, true),
+      "map(true, true)",
+      "map[TRUE, TRUE]",
+      "{true=true}")
+
+    // object literals
+    testTableApi(map(BigDecimal(1), BigDecimal(1)), "map(1p, 1p)", "{1=1}")
+
+    testAllApis(
+      map(map(1, 2), map(3, 4)),
+      "map(map(1, 2), map(3, 4))",
+      "MAP[MAP[1, 2], MAP[3, 4]]",
+      "{{1=2}={3=4}}")
+
+    testAllApis(
+      map(1 + 2, 3 * 3, 6 / 3, 4 - 2),
+      "map(1 + 2, 3 * 3, 6 / 3, 4 - 2)",
+      "map[1 + 2, 3 * 3, 6 / 3, 4 - 2]",
+      "{2=2, 3=9}")
+
+    testAllApis(
+      map(1, Null(Types.INT)),
+      "map(1, Null(INT))",
+      "map[1, NULLIF(1,1)]",
+      "{1=null}")
+
+    // explicit conversion
+    testAllApis(
+      map(1, 2L , 3, 4L),
+      "map(1, 2L, 3, 4L)",
+      "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)]",
+      "{1=2, 3=4}")
+
+    testAllApis(
+      map(Date.valueOf("1985-04-11"), 1),
+      "map('1985-04-11'.toDate, 1)",
+      "MAP[DATE '1985-04-11', 1]",
+      "{1985-04-11=1}")
+
+    testAllApis(
+      map(BigDecimal(2.0002), BigDecimal(2.0003)),
+      "map(2.0002p, 2.0003p)",
+      "MAP[CAST(2.0002 AS DECIMAL), CAST(2.0003 AS DECIMAL)]",
+      "{2.0002=2.0003}")
+  }
+
+  @Test
+  def testMapField(): Unit = {
+    testAllApis(
+      map('f4, 'f5),
+      "map(f4, f5)",
+      "MAP[f4, f5]",
+      "{foo=12}")
+
+    testAllApis(
+      map('f4, 'f1),
+      "map(f4, f1)",
+      "MAP[f4, f1]",
+      "{foo={}}")
+
+    testAllApis(
+      map('f2, 'f3),
+      "map(f2, f3)",
+      "MAP[f2, f3]",
+      "{{a=12, b=13}={12=a, 13=b}}")
+
+    testAllApis(
+      map('f1.at("a"), 'f5),
+      "map(f1.at('a'), f5)",
+      "MAP[f1['a'], f5]",
+      "{null=12}")
+
+    testAllApis(
+      'f1,
+      "f1",
+      "f1",
+      "{}")
+
+    testAllApis(
+      'f2,
+      "f2",
+      "f2",
+      "{a=12, b=13}")
+
+    testAllApis(
+      'f2.at("a"),
+      "f2.at('a')",
+      "f2['a']",
+      "12")
+
+    testAllApis(
+      'f3.at(12),
+      "f3.at(12)",
+      "f3[12]",
+      "a")
+
+    testAllApis(
+      map('f4, 'f3).at("foo").at(13),
+      "map(f4, f3).at('foo').at(13)",
+      "MAP[f4, f3]['foo'][13]",
+      "b")
+  }
+
+  @Test
+  def testMapOperations(): Unit = {
+
+    // comparison
+    testAllApis(
+      'f5 === 'f2.at("a"),
+      "f5 === f2.at('a')",
+      "f5 = f2['a']",
+      "true")
+
+    // comparison
+    testAllApis(
+      'f5 === 'f2.at("a"),
+      "f5 === f2.at('a')",
+      "f5 = f2['a']",
+      "true")
+
+    testAllApis(
+      'f0.at("map is null"),
+      "f0.at('map is null')",
+      "f0['map is null']",
+      "null")
+
+    testAllApis(
+      'f1.at("map is empty"),
+      "f1.at('map is empty')",
+      "f1['map is empty']",
+      "null")
+
+    testAllApis(
+      'f2.at("b"),
+      "f2.at('b')",
+      "f2['b']",
+      "13")
+
+    testAllApis(
+      'f3.at(1),
+      "f3.at(1)",
+      "f3[1]",
+      "null")
+
+    testAllApis(
+      'f3.at(12),
+      "f3.at(12)",
+      "f3[12]",
+      "a")
+  }
+
+  @Test
+  def testMapTypeCasting(): Unit = {
+    testTableApi(
+      'f2.cast(Types.MAP(Types.STRING, Types.INT)),
+      "f2.cast(MAP(STRING, INT))",
+      "{a=12, b=13}"
+    )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index c631212..7bdfee0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -156,6 +156,8 @@ class SqlExpressionTest extends ExpressionTestBase {
     // testSqlApi("('hello world', 12)", "hello world") // test base only returns field 0
     testSqlApi("ARRAY[TRUE, FALSE][2]", "false")
     testSqlApi("ARRAY[TRUE, TRUE]", "[true, true]")
+    testSqlApi("MAP['k1', 'v1', 'k2', 'v2']['k2']", "v2")
+    testSqlApi("MAP['k1', CAST(true AS VARCHAR(256)), 'k2', 'foo']['k1']", "true")
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
index d90d9df..a5df7ec 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
@@ -22,22 +22,26 @@ import java.util.{HashMap => JHashMap}
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, RowTypeInfo}
+import org.apache.flink.table.api.Types
 import org.apache.flink.types.Row
 
 class MapTypeTestBase extends ExpressionTestBase {
 
   override def testData: Any = {
-    val testData = new Row(4)
-    testData.setField(0, null)
-    testData.setField(1, new JHashMap[String, Int]())
-    val map = new JHashMap[String, Int]()
-    map.put("a", 12)
-    map.put("b", 13)
-    testData.setField(2, map)
+    val map1 = new JHashMap[String, Int]()
+    map1.put("a", 12)
+    map1.put("b", 13)
     val map2 = new JHashMap[Int, String]()
     map2.put(12, "a")
     map2.put(13, "b")
+    val testData = new Row(7)
+    testData.setField(0, null)
+    testData.setField(1, new JHashMap[String, Int]())
+    testData.setField(2, map1)
     testData.setField(3, map2)
+    testData.setField(4, "foo")
+    testData.setField(5, 12)
+    testData.setField(6, Array(1.2, 1.3))
     testData
   }
 
@@ -46,7 +50,10 @@ class MapTypeTestBase extends ExpressionTestBase {
       new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
       new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
       new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
-      new MapTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+      new MapTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+      Types.STRING,
+      Types.INT,
+      Types.OBJECT_ARRAY(Types.DOUBLE)
     ).asInstanceOf[TypeInformation[Any]]
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala
index 3f8306d..ae85b0d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala
@@ -26,6 +26,6 @@ class MapTypeValidationTest extends MapTypeTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testWrongKeyType(): Unit = {
-    testSqlApi("f4[12]", "FAIL")
+    testSqlApi("f2[12]", "FAIL")
   }
 }