You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:33 UTC
[26/50] [abbrv] flink git commit: [FLINK-6377] [table] Support map
types in the Table / SQL API
[FLINK-6377] [table] Support map types in the Table / SQL API
This closes #3767.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b6e71ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b6e71ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b6e71ce
Branch: refs/heads/table-retraction
Commit: 5b6e71ceb12d1fdf7d09d70744f3c0a8a4722768
Parents: 0a33431
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Apr 24 23:12:29 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Sun Apr 30 18:59:05 2017 +0200
----------------------------------------------------------------------
.../flink/table/calcite/FlinkTypeFactory.scala | 13 ++++--
.../flink/table/codegen/CodeGenerator.scala | 22 ++++++---
.../table/codegen/calls/ScalarOperators.scala | 34 +++++++++++++-
.../flink/table/plan/nodes/FlinkRelNode.scala | 2 +-
.../table/plan/schema/MapRelDataType.scala | 49 ++++++++++++++++++++
.../table/api/java/batch/sql/SqlITCase.java | 33 +++++++++++++
6 files changed, 140 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 22a5c9f..7762ff8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -28,13 +28,12 @@ import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
import org.apache.flink.table.api.TableException
-import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType}
+import org.apache.flink.table.plan.schema.{ArrayRelDataType, CompositeRelDataType, GenericRelDataType, MapRelDataType}
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
-import org.apache.flink.table.plan.schema.ArrayRelDataType
import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
import org.apache.flink.types.Row
@@ -123,6 +122,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
case oa: ObjectArrayTypeInfo[_, _] =>
new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
+ case mp: MapTypeInfo[_, _] =>
+ new MapRelDataType(mp, createTypeFromTypeInfo(mp.getKeyTypeInfo),
+ createTypeFromTypeInfo(mp.getValueTypeInfo), true)
+
case ti: TypeInformation[_] =>
new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
@@ -226,6 +229,10 @@ object FlinkTypeFactory {
val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
arrayRelDataType.typeInfo
+ case MAP if relDataType.isInstanceOf[MapRelDataType] =>
+ val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
+ mapRelDataType.typeInfo
+
case _@t =>
throw TableException(s"Type is not supported: $t")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 298fb70..648efe6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -28,9 +28,9 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.io.GenericInputFormat
-import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{AtomicType, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.api.TableConfig
@@ -1414,11 +1414,19 @@ class CodeGenerator(
generateArray(this, resultType, operands)
case ITEM =>
- val array = operands.head
- val index = operands(1)
- requireArray(array)
- requireInteger(index)
- generateArrayElementAt(this, array, index)
+ operands.head.resultType match {
+ case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] =>
+ val array = operands.head
+ val index = operands(1)
+ requireInteger(index)
+ generateArrayElementAt(this, array, index)
+
+ case map: MapTypeInfo[_, _] =>
+ val key = operands(1)
+ generateMapGet(this, operands.head, key)
+
+ case _ => throw new CodeGenException("Expect an array or a map.")
+ }
case CARDINALITY =>
val array = operands.head
http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 47a81ab..0c5baa6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -22,9 +22,9 @@ import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
import org.apache.calcite.util.BuiltInMethod
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
import org.apache.flink.table.codegen.CodeGenUtils._
-import org.apache.flink.table.codegen.{CodeGenerator, CodeGenException, GeneratedExpression}
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.table.typeutils.TypeCheckUtils._
@@ -911,6 +911,36 @@ object ScalarOperators {
}
}
+ def generateMapGet(
+ codeGenerator: CodeGenerator,
+ map: GeneratedExpression,
+ key: GeneratedExpression)
+ : GeneratedExpression = {
+
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val ty = map.resultType.asInstanceOf[MapTypeInfo[_,_]]
+ val resultType = ty.getValueTypeInfo
+ val resultTypeTerm = boxedTypeTermForTypeInfo(ty.getValueTypeInfo)
+ val accessCode = if (codeGenerator.nullCheck) {
+ s"""
+ |${map.code}
+ |${key.code}
+ |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm});
+ |$resultTypeTerm $resultTerm = $nullTerm ?
+ | null : ($resultTypeTerm) ${map.resultTerm}.get(${key.resultTerm});
+ |""".stripMargin
+ } else {
+ s"""
+ |${map.code}
+ |${key.code}
+ |$resultTypeTerm $resultTerm = ($resultTypeTerm)
+ | ${map.resultTerm}.get(${key.resultTerm});
+ |""".stripMargin
+ }
+ GeneratedExpression(resultTerm, nullTerm, accessCode, resultType)
+ }
+
// ----------------------------------------------------------------------------------------------
private def generateUnaryOperatorIfNotNull(
http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
index ccdddef..7554ea9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -93,7 +93,7 @@ trait FlinkRelNode extends RelNode {
case SqlTypeName.ARRAY =>
// 16 is an arbitrary estimate
estimateDataTypeSize(t.getComponentType) * 16
- case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate
+ case SqlTypeName.ANY | SqlTypeName.MAP => 128 // 128 is an arbitrary estimate
case _ => throw TableException(s"Unsupported data type encountered: $t")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala
new file mode 100644
index 0000000..b3ff99f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.schema
+
+import com.google.common.base.Objects
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.MapSqlType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+class MapRelDataType(
+ val typeInfo: TypeInformation[_],
+ val keyType: RelDataType,
+ val valueType: RelDataType,
+ isNullable: Boolean) extends MapSqlType(keyType, valueType, isNullable) {
+
+ override def toString: String = s"MAP($typeInfo)"
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[MapRelDataType]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: MapRelDataType =>
+ super.equals(that) &&
+ (that canEqual this) &&
+ keyType == that.keyType &&
+ valueType == that.valueType &&
+ isNullable == that.isNullable
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ Objects.hashCode(keyType, valueType)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
index 5ba67dd..114226c 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -18,9 +18,14 @@
package org.apache.flink.table.api.java.batch.sql;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.types.Row;
@@ -32,7 +37,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
@RunWith(Parameterized.class)
public class SqlITCase extends TableProgramsCollectionTestBase {
@@ -138,4 +146,29 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
compareResultAsText(results, expected);
}
+
+ @Test
+ public void testMap() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<Tuple2<Integer, Map<String, String>>> rows = new ArrayList<>();
+ rows.add(new Tuple2<>(1, Collections.singletonMap("foo", "bar")));
+ rows.add(new Tuple2<>(2, Collections.singletonMap("foo", "spam")));
+
+ TypeInformation<Tuple2<Integer, Map<String, String>>> ty = new TupleTypeInfo<>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+
+ DataSet<Tuple2<Integer, Map<String, String>>> ds1 = env.fromCollection(rows, ty);
+ tableEnv.registerDataSet("t1", ds1, "a, b");
+
+ String sqlQuery = "SELECT b['foo'] FROM t1";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "bar\n" + "spam\n";
+ compareResultAsText(results, expected);
+ }
}