You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:32 UTC

[25/50] [abbrv] flink git commit: [FLINK-6377] [table] Add additional map tests

[FLINK-6377] [table] Add additional map tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d49efbd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d49efbd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d49efbd2

Branch: refs/heads/table-retraction
Commit: d49efbd222c1aa963f3f9a7fb3cf359071d1bbd3
Parents: 5b6e71c
Author: twalthr <tw...@apache.org>
Authored: Wed Apr 26 18:04:00 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Sun Apr 30 18:59:05 2017 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  1 +
 .../org/apache/flink/table/api/Types.scala      | 12 +++-
 .../flink/table/codegen/ExpressionReducer.scala |  7 +-
 .../flink/table/plan/nodes/FlinkRelNode.scala   |  5 +-
 .../flink/table/expressions/MapTypeTest.scala   | 72 ++++++++++++++++++++
 5 files changed, 93 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 022a73d..2b777c6 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1747,6 +1747,7 @@ The Table API is built on top of Flink's DataSet and DataStream API. Internally,
 | `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long`       |
 | `Types.PRIMITIVE_ARRAY`| `ARRAY`                     | e.g. `int[]`           |
 | `Types.OBJECT_ARRAY`   | `ARRAY`                     | e.g. `java.lang.Byte[]`|
+| `Types.MAP`            | `MAP`                       | `java.util.HashMap`    |
 
 
 Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and array types (object or primitive arrays) can be fields of a row. 

http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
index f22fa32..2152b72 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.api
 
 import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.types.Row
 
@@ -100,4 +100,14 @@ object Types {
   def OBJECT_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
     ObjectArrayTypeInfo.getInfoFor(elementType)
   }
+
+  /**
+    * Generates type information for a Java HashMap.
+    *
+    * @param keyType type of the keys of the map e.g. Types.STRING
+    * @param valueType type of the values of the map e.g. Types.STRING
+    */
+  def MAP(keyType: TypeInformation[_], valueType: TypeInformation[_]): TypeInformation[_] = {
+    new MapTypeInfo(keyType, valueType)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index 3fcbdc1..b7e1335 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -65,7 +65,10 @@ class ExpressionReducer(config: TableConfig)
         )
 
       // we don't support object literals yet, we skip those constant expressions
-      case (SqlTypeName.ANY, _) | (SqlTypeName.ROW, _) | (SqlTypeName.ARRAY, _) => None
+      case (SqlTypeName.ANY, _) |
+           (SqlTypeName.ROW, _) |
+           (SqlTypeName.ARRAY, _) |
+           (SqlTypeName.MAP, _) => None
 
       case (_, e) => Some(e)
     }
@@ -103,7 +106,7 @@ class ExpressionReducer(config: TableConfig)
       val unreduced = constExprs.get(i)
       unreduced.getType.getSqlTypeName match {
         // we insert the original expression for object literals
-        case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY =>
+        case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY | SqlTypeName.MAP =>
           reducedValues.add(unreduced)
         case _ =>
           val reducedValue = reduced.getField(reducedIdx)

http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
index 7554ea9..0b244e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -93,7 +93,10 @@ trait FlinkRelNode extends RelNode {
     case SqlTypeName.ARRAY =>
       // 16 is an arbitrary estimate
       estimateDataTypeSize(t.getComponentType) * 16
-    case SqlTypeName.ANY | SqlTypeName.MAP => 128 // 128 is an arbitrary estimate
+    case SqlTypeName.MAP =>
+      // 16 is an arbitrary estimate
+      (estimateDataTypeSize(t.getKeyType) + estimateDataTypeSize(t.getValueType)) * 16
+    case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate
     case _ => throw TableException(s"Unsupported data type encountered: $t")
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
new file mode 100644
index 0000000..ca80737
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.util.{HashMap => JHashMap}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, RowTypeInfo}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class MapTypeTest extends ExpressionTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testWrongKeyType(): Unit = {
+    testSqlApi("f4[12]", "FAIL")
+  }
+
+  @Test
+  def testItem(): Unit = {
+    testSqlApi("f0['map is null']", "null")
+    testSqlApi("f1['map is empty']", "null")
+    testSqlApi("f2['b']", "13")
+    testSqlApi("f3[1]", "null")
+    testSqlApi("f3[12]", "a")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  override def testData: Any = {
+    val testData = new Row(4)
+    testData.setField(0, null)
+    testData.setField(1, new JHashMap[String, Int]())
+    val map = new JHashMap[String, Int]()
+    map.put("a", 12)
+    map.put("b", 13)
+    testData.setField(2, map)
+    val map2 = new JHashMap[Int, String]()
+    map2.put(12, "a")
+    map2.put(13, "b")
+    testData.setField(3, map2)
+    testData
+  }
+
+  override def typeInfo: TypeInformation[Any] = {
+    new RowTypeInfo(
+      new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+      new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+      new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+      new MapTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+    ).asInstanceOf[TypeInformation[Any]]
+  }
+
+}