You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:32 UTC
[25/50] [abbrv] flink git commit: [FLINK-6377] [table] Add additional
map tests
[FLINK-6377] [table] Add additional map tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d49efbd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d49efbd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d49efbd2
Branch: refs/heads/table-retraction
Commit: d49efbd222c1aa963f3f9a7fb3cf359071d1bbd3
Parents: 5b6e71c
Author: twalthr <tw...@apache.org>
Authored: Wed Apr 26 18:04:00 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Sun Apr 30 18:59:05 2017 +0200
----------------------------------------------------------------------
docs/dev/table_api.md | 1 +
.../org/apache/flink/table/api/Types.scala | 12 +++-
.../flink/table/codegen/ExpressionReducer.scala | 7 +-
.../flink/table/plan/nodes/FlinkRelNode.scala | 5 +-
.../flink/table/expressions/MapTypeTest.scala | 72 ++++++++++++++++++++
5 files changed, 93 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 022a73d..2b777c6 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1747,6 +1747,7 @@ The Table API is built on top of Flink's DataSet and DataStream API. Internally,
| `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` |
| `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]` |
| `Types.OBJECT_ARRAY` | `ARRAY` | e.g. `java.lang.Byte[]`|
+| `Types.MAP` | `MAP` | `java.util.HashMap` |
Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and array types (object or primitive arrays) can be fields of a row.
http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
index f22fa32..2152b72 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.api
import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.types.Row
@@ -100,4 +100,14 @@ object Types {
def OBJECT_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
ObjectArrayTypeInfo.getInfoFor(elementType)
}
+
+ /**
+ * Generates type information for a Java HashMap.
+ *
+ * @param keyType type of the keys of the map e.g. Types.STRING
+ * @param valueType type of the values of the map e.g. Types.STRING
+ */
+ def MAP(keyType: TypeInformation[_], valueType: TypeInformation[_]): TypeInformation[_] = {
+ new MapTypeInfo(keyType, valueType)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index 3fcbdc1..b7e1335 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -65,7 +65,10 @@ class ExpressionReducer(config: TableConfig)
)
// we don't support object literals yet, we skip those constant expressions
- case (SqlTypeName.ANY, _) | (SqlTypeName.ROW, _) | (SqlTypeName.ARRAY, _) => None
+ case (SqlTypeName.ANY, _) |
+ (SqlTypeName.ROW, _) |
+ (SqlTypeName.ARRAY, _) |
+ (SqlTypeName.MAP, _) => None
case (_, e) => Some(e)
}
@@ -103,7 +106,7 @@ class ExpressionReducer(config: TableConfig)
val unreduced = constExprs.get(i)
unreduced.getType.getSqlTypeName match {
// we insert the original expression for object literals
- case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY =>
+ case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY | SqlTypeName.MAP =>
reducedValues.add(unreduced)
case _ =>
val reducedValue = reduced.getField(reducedIdx)
http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
index 7554ea9..0b244e9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -93,7 +93,10 @@ trait FlinkRelNode extends RelNode {
case SqlTypeName.ARRAY =>
// 16 is an arbitrary estimate
estimateDataTypeSize(t.getComponentType) * 16
- case SqlTypeName.ANY | SqlTypeName.MAP => 128 // 128 is an arbitrary estimate
+ case SqlTypeName.MAP =>
+ // 16 is an arbitrary estimate
+ (estimateDataTypeSize(t.getKeyType) + estimateDataTypeSize(t.getValueType)) * 16
+ case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate
case _ => throw TableException(s"Unsupported data type encountered: $t")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d49efbd2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
new file mode 100644
index 0000000..ca80737
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.util.{HashMap => JHashMap}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, RowTypeInfo}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class MapTypeTest extends ExpressionTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testWrongKeyType(): Unit = {
+ testSqlApi("f4[12]", "FAIL")
+ }
+
+ @Test
+ def testItem(): Unit = {
+ testSqlApi("f0['map is null']", "null")
+ testSqlApi("f1['map is empty']", "null")
+ testSqlApi("f2['b']", "13")
+ testSqlApi("f3[1]", "null")
+ testSqlApi("f3[12]", "a")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ override def testData: Any = {
+ val testData = new Row(4)
+ testData.setField(0, null)
+ testData.setField(1, new JHashMap[String, Int]())
+ val map = new JHashMap[String, Int]()
+ map.put("a", 12)
+ map.put("b", 13)
+ testData.setField(2, map)
+ val map2 = new JHashMap[Int, String]()
+ map2.put(12, "a")
+ map2.put(13, "b")
+ testData.setField(3, map2)
+ testData
+ }
+
+ override def typeInfo: TypeInformation[Any] = {
+ new RowTypeInfo(
+ new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+ new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+ new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
+ new MapTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+ ).asInstanceOf[TypeInformation[Any]]
+ }
+
+}