You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:33 UTC

[26/50] [abbrv] flink git commit: [FLINK-6377] [table] Support map types in the Table / SQL API

[FLINK-6377] [table] Support map types in the Table / SQL API

This closes #3767.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b6e71ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b6e71ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b6e71ce

Branch: refs/heads/table-retraction
Commit: 5b6e71ceb12d1fdf7d09d70744f3c0a8a4722768
Parents: 0a33431
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Apr 24 23:12:29 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Sun Apr 30 18:59:05 2017 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 13 ++++--
 .../flink/table/codegen/CodeGenerator.scala     | 22 ++++++---
 .../table/codegen/calls/ScalarOperators.scala   | 34 +++++++++++++-
 .../flink/table/plan/nodes/FlinkRelNode.scala   |  2 +-
 .../table/plan/schema/MapRelDataType.scala      | 49 ++++++++++++++++++++
 .../table/api/java/batch/sql/SqlITCase.java     | 33 +++++++++++++
 6 files changed, 140 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 22a5c9f..7762ff8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -28,13 +28,12 @@ import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType}
+import org.apache.flink.table.plan.schema.{ArrayRelDataType, CompositeRelDataType, GenericRelDataType, MapRelDataType}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
-import org.apache.flink.table.plan.schema.ArrayRelDataType
 import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
 import org.apache.flink.types.Row
 
@@ -123,6 +122,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     case oa: ObjectArrayTypeInfo[_, _] =>
       new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
 
+    case mp: MapTypeInfo[_, _] =>
+      new MapRelDataType(mp, createTypeFromTypeInfo(mp.getKeyTypeInfo),
+        createTypeFromTypeInfo(mp.getValueTypeInfo), true)
+
     case ti: TypeInformation[_] =>
       new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
 
@@ -226,6 +229,10 @@ object FlinkTypeFactory {
       val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType]
       arrayRelDataType.typeInfo
 
+    case MAP if relDataType.isInstanceOf[MapRelDataType] =>
+      val mapRelDataType = relDataType.asInstanceOf[MapRelDataType]
+      mapRelDataType.typeInfo
+
     case _@t =>
       throw TableException(s"Type is not supported: $t")
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 298fb70..648efe6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -28,9 +28,9 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.GenericInputFormat
-import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{AtomicType, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.api.TableConfig
@@ -1414,11 +1414,19 @@ class CodeGenerator(
         generateArray(this, resultType, operands)
 
       case ITEM =>
-        val array = operands.head
-        val index = operands(1)
-        requireArray(array)
-        requireInteger(index)
-        generateArrayElementAt(this, array, index)
+        operands.head.resultType match {
+          case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] =>
+            val array = operands.head
+            val index = operands(1)
+            requireInteger(index)
+            generateArrayElementAt(this, array, index)
+
+          case map: MapTypeInfo[_, _] =>
+            val key = operands(1)
+            generateMapGet(this, operands.head, key)
+
+          case _ => throw new CodeGenException("Expect an array or a map.")
+        }
 
       case CARDINALITY =>
         val array = operands.head

http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 47a81ab..0c5baa6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -22,9 +22,9 @@ import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
 import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
 import org.apache.flink.table.codegen.CodeGenUtils._
-import org.apache.flink.table.codegen.{CodeGenerator, CodeGenException, GeneratedExpression}
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 
@@ -911,6 +911,36 @@ object ScalarOperators {
     }
   }
 
+  def generateMapGet(
+      codeGenerator: CodeGenerator,
+      map: GeneratedExpression,
+      key: GeneratedExpression)
+  : GeneratedExpression = {
+
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val ty = map.resultType.asInstanceOf[MapTypeInfo[_,_]]
+    val resultType = ty.getValueTypeInfo
+    val resultTypeTerm = boxedTypeTermForTypeInfo(ty.getValueTypeInfo)
+    val accessCode = if (codeGenerator.nullCheck) {
+          s"""
+             |${map.code}
+             |${key.code}
+             |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm});
+             |$resultTypeTerm $resultTerm = $nullTerm ?
+             |  null : ($resultTypeTerm) ${map.resultTerm}.get(${key.resultTerm});
+             |""".stripMargin
+        } else {
+          s"""
+             |${map.code}
+             |${key.code}
+             |$resultTypeTerm $resultTerm = ($resultTypeTerm)
+             | ${map.resultTerm}.get(${key.resultTerm});
+             |""".stripMargin
+        }
+    GeneratedExpression(resultTerm, nullTerm, accessCode, resultType)
+  }
+
   // ----------------------------------------------------------------------------------------------
 
   private def generateUnaryOperatorIfNotNull(

http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
index ccdddef..7554ea9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -93,7 +93,7 @@ trait FlinkRelNode extends RelNode {
     case SqlTypeName.ARRAY =>
       // 16 is an arbitrary estimate
       estimateDataTypeSize(t.getComponentType) * 16
-    case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate
+    case SqlTypeName.ANY | SqlTypeName.MAP => 128 // 128 is an arbitrary estimate
     case _ => throw TableException(s"Unsupported data type encountered: $t")
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala
new file mode 100644
index 0000000..b3ff99f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.schema
+
+import com.google.common.base.Objects
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.MapSqlType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+class MapRelDataType(
+   val typeInfo: TypeInformation[_],
+   val keyType: RelDataType,
+   val valueType: RelDataType,
+   isNullable: Boolean) extends MapSqlType(keyType, valueType, isNullable) {
+
+  override def toString: String = s"MAP($typeInfo)"
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[MapRelDataType]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: MapRelDataType =>
+      super.equals(that) &&
+        (that canEqual this) &&
+        keyType == that.keyType &&
+        valueType == that.valueType &&
+        isNullable == that.isNullable
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Objects.hashCode(keyType, valueType)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
index 5ba67dd..114226c 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -18,9 +18,14 @@
 
 package org.apache.flink.table.api.java.batch.sql;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.types.Row;
@@ -32,7 +37,10 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 @RunWith(Parameterized.class)
 public class SqlITCase extends TableProgramsCollectionTestBase {
@@ -138,4 +146,29 @@ public class SqlITCase extends TableProgramsCollectionTestBase {
 		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
 		compareResultAsText(results, expected);
 	}
+
+	@Test
+	public void testMap() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<Tuple2<Integer, Map<String, String>>> rows = new ArrayList<>();
+		rows.add(new Tuple2<>(1, Collections.singletonMap("foo", "bar")));
+		rows.add(new Tuple2<>(2, Collections.singletonMap("foo", "spam")));
+
+		TypeInformation<Tuple2<Integer, Map<String, String>>> ty = new TupleTypeInfo<>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+
+		DataSet<Tuple2<Integer, Map<String, String>>> ds1 = env.fromCollection(rows, ty);
+		tableEnv.registerDataSet("t1", ds1, "a, b");
+
+		String sqlQuery = "SELECT b['foo'] FROM t1";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "bar\n" + "spam\n";
+		compareResultAsText(results, expected);
+	}
 }