You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/21 16:09:49 UTC
[1/2] flink git commit: [FLINK-8038] [table] Clear maps and support
cardinality
Repository: flink
Updated Branches:
refs/heads/master 52599ff33 -> 9e3439c01
[FLINK-8038] [table] Clear maps and support cardinality
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e3439c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e3439c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e3439c0
Branch: refs/heads/master
Commit: 9e3439c013928e52ea99fe87579512f1c2b2c28e
Parents: c5f5615
Author: twalthr <tw...@apache.org>
Authored: Tue Nov 21 16:57:04 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Tue Nov 21 17:09:02 2017 +0100
----------------------------------------------------------------------
.../flink/table/api/scala/expressionDsl.scala | 4 +-
.../flink/table/codegen/CodeGenerator.scala | 16 +-
.../table/codegen/calls/ScalarOperators.scala | 54 +++--
.../table/expressions/ExpressionUtils.scala | 4 -
.../apache/flink/table/expressions/array.scala | 89 --------
.../flink/table/expressions/cardinality.scala | 50 -----
.../flink/table/expressions/collection.scala | 207 +++++++++++++++++++
.../apache/flink/table/expressions/item.scala | 76 -------
.../apache/flink/table/expressions/map.scala | 76 -------
.../flink/table/expressions/MapTypeTest.scala | 6 +
.../table/runtime/stream/table/CalcITCase.scala | 39 ++++
11 files changed, 292 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 72a5561..2708b5c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -1050,7 +1050,7 @@ object randInteger {
*/
object concat {
def apply(string: Expression, strings: Expression*): Expression = {
- new Concat(Seq(string) ++ strings)
+ Concat(Seq(string) ++ strings)
}
}
@@ -1063,7 +1063,7 @@ object concat {
**/
object concat_ws {
def apply(separator: Expression, string: Expression, strings: Expression*): Expression = {
- new ConcatWs(separator, Seq(string) ++ strings)
+ ConcatWs(separator, Seq(string) ++ strings)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index b51cdbe..91fb619 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -963,15 +963,13 @@ abstract class CodeGenerator(
case ITEM =>
operands.head.resultType match {
- case _: ObjectArrayTypeInfo[_, _] |
- _: BasicArrayTypeInfo[_, _] |
- _: PrimitiveArrayTypeInfo[_] =>
+ case t: TypeInformation[_] if isArray(t) =>
val array = operands.head
val index = operands(1)
requireInteger(index)
generateArrayElementAt(this, array, index)
- case _: MapTypeInfo[_, _] =>
+ case t: TypeInformation[_] if isMap(t) =>
val key = operands(1)
generateMapGet(this, operands.head, key)
@@ -980,16 +978,12 @@ abstract class CodeGenerator(
case CARDINALITY =>
operands.head.resultType match {
- case _: ObjectArrayTypeInfo[_, _] |
- _: BasicArrayTypeInfo[_, _] |
- _: PrimitiveArrayTypeInfo[_] =>
+ case t: TypeInformation[_] if isArray(t) =>
val array = operands.head
- requireArray(array)
generateArrayCardinality(nullCheck, array)
- case _: MapTypeInfo[_, _] =>
+ case t: TypeInformation[_] if isMap(t) =>
val map = operands.head
- requireMap(map)
generateMapCardinality(nullCheck, map)
case _ => throw new CodeGenException("Expect an array or a map.")
@@ -1580,7 +1574,7 @@ abstract class CodeGenerator(
/**
* Adds a reusable hash map to the member area of the generated [[Function]].
*/
- def addReusableMap(clazz: Class[_]): String = {
+ def addReusableMap(): String = {
val fieldTerm = newName("map")
val classQualifier = "java.util.Map"
val initMap = "java.util.HashMap()"
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 522d826..a6d77c1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -1079,8 +1079,9 @@ object ScalarOperators {
codeGenerator: CodeGenerator,
resultType: TypeInformation[_],
elements: Seq[GeneratedExpression])
- : GeneratedExpression = {
- val mapTerm = codeGenerator.addReusableMap(resultType.getTypeClass)
+ : GeneratedExpression = {
+
+ val mapTerm = codeGenerator.addReusableMap()
val boxedElements: Seq[GeneratedExpression] = resultType match {
case mti: MapTypeInfo[_, _] =>
@@ -1103,8 +1104,15 @@ object ScalarOperators {
}
}
+ // clear the map when it is not guaranteed that keys are constant
+ var clearMap: Boolean = false
+
val code = boxedElements.grouped(2)
.map { case Seq(key, value) =>
+ // check if all keys are constant
+ if (!key.literal) {
+ clearMap = true
+ }
s"""
|${key.code}
|${value.code}
@@ -1113,14 +1121,18 @@ object ScalarOperators {
}
.mkString("\n")
- GeneratedExpression(mapTerm, GeneratedExpression.NEVER_NULL, code, resultType)
+ GeneratedExpression(
+ mapTerm,
+ GeneratedExpression.NEVER_NULL,
+ (if (clearMap) s"$mapTerm.clear();\n" else "") + code,
+ resultType)
}
def generateMapGet(
codeGenerator: CodeGenerator,
map: GeneratedExpression,
key: GeneratedExpression)
- : GeneratedExpression = {
+ : GeneratedExpression = {
val resultTerm = newName("result")
val nullTerm = newName("isNull")
@@ -1128,30 +1140,30 @@ object ScalarOperators {
val resultType = ty.getValueTypeInfo
val resultTypeTerm = boxedTypeTermForTypeInfo(ty.getValueTypeInfo)
val accessCode = if (codeGenerator.nullCheck) {
- s"""
- |${map.code}
- |${key.code}
- |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm});
- |$resultTypeTerm $resultTerm = $nullTerm ?
- | null : ($resultTypeTerm) ${map.resultTerm}.get(${key.resultTerm});
- |""".stripMargin
- } else {
- s"""
- |${map.code}
- |${key.code}
- |$resultTypeTerm $resultTerm = ($resultTypeTerm)
- | ${map.resultTerm}.get(${key.resultTerm});
- |""".stripMargin
- }
+ s"""
+ |${map.code}
+ |${key.code}
+ |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm});
+ |$resultTypeTerm $resultTerm = $nullTerm ?
+ | null : ($resultTypeTerm) ${map.resultTerm}.get(${key.resultTerm});
+ |""".stripMargin
+ } else {
+ s"""
+ |${map.code}
+ |${key.code}
+ |$resultTypeTerm $resultTerm = ($resultTypeTerm)
+ | ${map.resultTerm}.get(${key.resultTerm});
+ |""".stripMargin
+ }
GeneratedExpression(resultTerm, nullTerm, accessCode, resultType)
}
def generateMapCardinality(
nullCheck: Boolean,
map: GeneratedExpression)
- : GeneratedExpression = {
+ : GeneratedExpression = {
generateUnaryOperatorIfNotNull(nullCheck, INT_TYPE_INFO, map) {
- (operandTerm) => s"${map.resultTerm}.size"
+ (operandTerm) => s"$operandTerm.size()"
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
index 3b52ab4..08abc8f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -138,10 +138,6 @@ object ExpressionUtils {
}
}
- private[flink] def convertMap(map: Map[Expression, Expression]): Expression = {
- MapConstructor(map.flatMap(entry => Seq(entry._1, entry._2)).toSeq)
- }
-
// ----------------------------------------------------------------------------------------------
// RexNode conversion functions (see org.apache.calcite.sql2rel.StandardConvertletTable)
// ----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
deleted file mode 100644
index c43bddd..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.table.calcite.FlinkRelBuilder
-import org.apache.flink.table.typeutils.TypeCheckUtils.isArray
-import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-
-import scala.collection.JavaConverters._
-
-case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
-
- override private[flink] def children: Seq[Expression] = elements
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- val relDataType = relBuilder
- .asInstanceOf[FlinkRelBuilder]
- .getTypeFactory
- .createTypeFromTypeInfo(resultType, isNullable = false)
- val values = elements.map(_.toRexNode).toList.asJava
- relBuilder
- .getRexBuilder
- .makeCall(relDataType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, values)
- }
-
- override def toString = s"array(${elements.mkString(", ")})"
-
- override private[flink] def resultType = ObjectArrayTypeInfo.getInfoFor(elements.head.resultType)
-
- override private[flink] def validateInput(): ValidationResult = {
- if (elements.isEmpty) {
- return ValidationFailure("Empty arrays are not supported yet.")
- }
- val elementType = elements.head.resultType
- if (!elements.forall(_.resultType == elementType)) {
- ValidationFailure("Not all elements of the array have the same type.")
- } else {
- ValidationSuccess
- }
- }
-}
-
-case class ArrayElement(array: Expression) extends Expression {
-
- override private[flink] def children: Seq[Expression] = Seq(array)
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder
- .getRexBuilder
- .makeCall(SqlStdOperatorTable.ELEMENT, array.toRexNode)
- }
-
- override def toString = s"($array).element()"
-
- override private[flink] def resultType = array.resultType match {
- case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
- case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
- case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
- }
-
- override private[flink] def validateInput(): ValidationResult = {
- array.resultType match {
- case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
- case other@_ => ValidationFailure(s"Array expected but was '$other'.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
deleted file mode 100644
index aaf52b0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isMap}
-import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-
-case class Cardinality(container: Expression) extends Expression {
-
- override private[flink] def children: Seq[Expression] = Seq(container)
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder
- .getRexBuilder
- .makeCall(SqlStdOperatorTable.CARDINALITY, container.toRexNode)
- }
-
- override def toString = s"($container).cardinality()"
-
- override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- container.resultType match {
- case mti: TypeInformation[_] if isMap(mti) => ValidationSuccess
- case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
- case other@_ => ValidationFailure(s"Array expected but was '$other'.")
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
new file mode 100644
index 0000000..a3c6a54
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo, ObjectArrayTypeInfo}
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isMap}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = elements
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val relDataType = relBuilder
+ .asInstanceOf[FlinkRelBuilder]
+ .getTypeFactory
+ .createTypeFromTypeInfo(resultType, isNullable = false)
+ val values = elements.map(_.toRexNode).toList.asJava
+ relBuilder
+ .getRexBuilder
+ .makeCall(relDataType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, values)
+ }
+
+ override def toString = s"array(${elements.mkString(", ")})"
+
+ override private[flink] def resultType = ObjectArrayTypeInfo.getInfoFor(elements.head.resultType)
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (elements.isEmpty) {
+ return ValidationFailure("Empty arrays are not supported yet.")
+ }
+ val elementType = elements.head.resultType
+ if (!elements.forall(_.resultType == elementType)) {
+ ValidationFailure("Not all elements of the array have the same type.")
+ } else {
+ ValidationSuccess
+ }
+ }
+}
+
+case class MapConstructor(elements: Seq[Expression]) extends Expression {
+ override private[flink] def children: Seq[Expression] = elements
+
+ private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
+ new GenericTypeInfo[AnyRef](classOf[AnyRef]),
+ new GenericTypeInfo[AnyRef](classOf[AnyRef]))
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+ val relDataType = typeFactory.createMapType(
+ typeFactory.createTypeFromTypeInfo(elements.head.resultType, isNullable = true),
+ typeFactory.createTypeFromTypeInfo(elements.last.resultType, isNullable = true)
+ )
+ val values = elements.map(_.toRexNode).toList.asJava
+ relBuilder
+ .getRexBuilder
+ .makeCall(relDataType, SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, values)
+ }
+
+ override def toString = s"map(${elements
+ .grouped(2)
+ .map(x => s"[${x.mkString(": ")}]").mkString(", ")})"
+
+ override private[flink] def resultType: TypeInformation[_] = new MapTypeInfo(
+ elements.head.resultType,
+ elements.last.resultType
+ )
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (elements.isEmpty) {
+ return ValidationFailure("Empty maps are not supported yet.")
+ }
+ if (elements.size % 2 != 0) {
+ return ValidationFailure("Maps must have an even number of elements to form key-value pairs.")
+ }
+ if (!elements.grouped(2).forall(_.head.resultType == elements.head.resultType)) {
+ return ValidationFailure("Not all key elements of the map literal have the same type.")
+ }
+ if (!elements.grouped(2).forall(_.last.resultType == elements.last.resultType)) {
+ return ValidationFailure("Not all value elements of the map literal have the same type.")
+ }
+ ValidationSuccess
+ }
+}
+
+case class ArrayElement(array: Expression) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = Seq(array)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(SqlStdOperatorTable.ELEMENT, array.toRexNode)
+ }
+
+ override def toString = s"($array).element()"
+
+ override private[flink] def resultType = array.resultType match {
+ case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+ case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
+ case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ array.resultType match {
+ case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
+ case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+ }
+ }
+}
+
+case class Cardinality(container: Expression) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = Seq(container)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(SqlStdOperatorTable.CARDINALITY, container.toRexNode)
+ }
+
+ override def toString = s"($container).cardinality()"
+
+ override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ container.resultType match {
+ case mti: TypeInformation[_] if isMap(mti) => ValidationSuccess
+ case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
+ case other@_ => ValidationFailure(s"Array or map expected but was '$other'.")
+ }
+ }
+}
+
+case class ItemAt(container: Expression, key: Expression) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = Seq(container, key)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(SqlStdOperatorTable.ITEM, container.toRexNode, key.toRexNode)
+ }
+
+ override def toString = s"($container).at($key)"
+
+ override private[flink] def resultType = container.resultType match {
+ case mti: MapTypeInfo[_, _] => mti.getValueTypeInfo
+ case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+ case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
+ case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ container.resultType match {
+
+ case ati: TypeInformation[_] if isArray(ati) =>
+ if (key.resultType == INT_TYPE_INFO) {
+ // check for common user mistake
+ key match {
+ case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
+ ValidationFailure(
+ s"Array element access needs an index starting at 1 but was $value.")
+ case _ => ValidationSuccess
+ }
+ } else {
+ ValidationFailure(
+ s"Array element access needs an integer index but was '${key.resultType}'.")
+ }
+
+ case mti: MapTypeInfo[_, _] =>
+ if (key.resultType == mti.getKeyTypeInfo) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(
+ s"Map entry access needs a valid key of type " +
+ s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.")
+ }
+
+ case other@_ => ValidationFailure(s"Array or map expected but was '$other'.")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
deleted file mode 100644
index 75a1224..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
-import org.apache.flink.table.typeutils.TypeCheckUtils.isArray
-import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-
-case class ItemAt(container: Expression, key: Expression) extends Expression {
-
- override private[flink] def children: Seq[Expression] = Seq(container, key)
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder
- .getRexBuilder
- .makeCall(SqlStdOperatorTable.ITEM, container.toRexNode, key.toRexNode)
- }
-
- override def toString = s"($container).at($key)"
-
- override private[flink] def resultType = container.resultType match {
- case mti: MapTypeInfo[_, _] => mti.getValueTypeInfo
- case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
- case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
- case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
- }
-
- override private[flink] def validateInput(): ValidationResult = {
- container.resultType match {
- case ati: TypeInformation[_] if isArray(ati) =>
- if (key.resultType == INT_TYPE_INFO) {
- // check for common user mistake
- key match {
- case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
- ValidationFailure(
- s"Array element access needs an index starting at 1 but was $value.")
- case _ => ValidationSuccess
- }
- } else {
- ValidationFailure(
- s"Array element access needs an integer index but was '${key.resultType}'.")
- }
- case mti: MapTypeInfo[_, _] =>
- if (key.resultType == mti.getKeyTypeInfo) {
- ValidationSuccess
- } else {
- ValidationFailure(
- s"Map key-value access needs a valid key of type " +
- s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.")
- }
- case other@_ => ValidationFailure(s"Array or map expected but was '$other'.")
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
deleted file mode 100644
index bf71401..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo}
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.plan.schema.MapRelDataType
-import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-
-import scala.collection.JavaConverters._
-
-case class MapConstructor(elements: Seq[Expression]) extends Expression {
- override private[flink] def children: Seq[Expression] = elements
-
- private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
- new GenericTypeInfo[AnyRef](classOf[AnyRef]),
- new GenericTypeInfo[AnyRef](classOf[AnyRef]))
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
- val relDataType = typeFactory.createMapType(
- typeFactory.createTypeFromTypeInfo(elements.head.resultType, isNullable = true),
- typeFactory.createTypeFromTypeInfo(elements.last.resultType, isNullable = true)
- )
- val values = elements.map(_.toRexNode).toList.asJava
- relBuilder
- .getRexBuilder
- .makeCall(relDataType, SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, values)
- }
-
- override def toString = s"map(${elements
- .grouped(2)
- .map(x => s"[${x.mkString(": ")}]").mkString(", ")})"
-
- override private[flink] def resultType: TypeInformation[_] = new MapTypeInfo(
- elements.head.resultType,
- elements.last.resultType
- )
-
- override private[flink] def validateInput(): ValidationResult = {
- if (elements.isEmpty) {
- return ValidationFailure("Empty maps are not supported yet.")
- }
- if (elements.size % 2 != 0) {
- return ValidationFailure("Maps must have even number of elements to form key value pairs.")
- }
- if (!elements.grouped(2).forall(_.head.resultType == elements.head.resultType)) {
- return ValidationFailure("Not all key elements of the map literal have the same type.")
- }
- if (!elements.grouped(2).forall(_.last.resultType == elements.last.resultType)) {
- return ValidationFailure("Not all value elements of the map literal have the same type.")
- }
- ValidationSuccess
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
index a2f2a0b..b173349 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
@@ -182,6 +182,12 @@ class MapTypeTest extends MapTypeTestBase {
"f3.at(12)",
"f3[12]",
"a")
+
+ testAllApis(
+ 'f3.cardinality(),
+ "f3.cardinality()",
+ "CARDINALITY(f3)",
+ "2")
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index ca6da80..03dd6db 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -313,4 +313,43 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
)
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testMapType(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env)
+ .toTable(tEnv)
+ .select(map('_1, '_3))
+
+ val results = ds.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "{10=Comment#4}",
+ "{11=Comment#5}",
+ "{12=Comment#6}",
+ "{13=Comment#7}",
+ "{14=Comment#8}",
+ "{15=Comment#9}",
+ "{16=Comment#10}",
+ "{17=Comment#11}",
+ "{18=Comment#12}",
+ "{19=Comment#13}",
+ "{1=Hi}",
+ "{20=Comment#14}",
+ "{21=Comment#15}",
+ "{2=Hello}",
+ "{3=Hello world}",
+ "{4=Hello world, how are you?}",
+ "{5=I am fine.}",
+ "{6=Luke Skywalker}",
+ "{7=Comment#1}",
+ "{8=Comment#2}",
+ "{9=Comment#3}")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
}
[2/2] flink git commit: [FLINK-8038] [table] Support map value
constructor, cardinality, and item
Posted by tw...@apache.org.
[FLINK-8038] [table] Support map value constructor, cardinality, and item
This closes #5015.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5f5615c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5f5615c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5f5615c
Branch: refs/heads/master
Commit: c5f5615cf84026039614701b5e6b3b0e003eada0
Parents: 52599ff
Author: Rong Rong <ro...@uber.com>
Authored: Tue Nov 14 10:48:16 2017 -0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Nov 21 17:09:02 2017 +0100
----------------------------------------------------------------------
docs/dev/table/sql.md | 45 +++++
docs/dev/table/tableApi.md | 48 ++++-
.../flink/table/api/scala/expressionDsl.scala | 25 ++-
.../flink/table/calcite/FlinkTypeFactory.scala | 11 ++
.../flink/table/codegen/CodeGenUtils.scala | 5 +
.../flink/table/codegen/CodeGenerator.scala | 38 +++-
.../table/codegen/calls/ScalarOperators.scala | 64 +++++++
.../table/expressions/ExpressionParser.scala | 2 +
.../table/expressions/ExpressionUtils.scala | 4 +
.../apache/flink/table/expressions/array.scala | 60 -------
.../flink/table/expressions/cardinality.scala | 50 ++++++
.../apache/flink/table/expressions/item.scala | 76 ++++++++
.../apache/flink/table/expressions/map.scala | 76 ++++++++
.../flink/table/plan/ProjectionTranslator.scala | 7 +
.../flink/table/typeutils/TypeCheckUtils.scala | 5 +-
.../flink/table/validate/FunctionCatalog.scala | 16 +-
.../flink/table/expressions/ArrayTypeTest.scala | 9 +
.../flink/table/expressions/MapTypeTest.scala | 173 ++++++++++++++++++-
.../table/expressions/SqlExpressionTest.scala | 2 +
.../expressions/utils/MapTypeTestBase.scala | 23 ++-
.../validation/MapTypeValidationTest.scala | 2 +-
21 files changed, 652 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 3097d9e..50aa933 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -2310,6 +2310,51 @@ ELEMENT(ARRAY)
</tbody>
</table>
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 40%">Map functions</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+
+ <tr>
+ <td>
+ {% highlight text %}
+MAP ‘[’ key, value [, key, value ]* ‘]’
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Creates a map from a list of key-value pairs.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight text %}
+CARDINALITY(MAP)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the number of entries of a map.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight text %}
+map ‘[’ key ‘]’
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the value specified by a particular key in a map.</p>
+ </td>
+ </tr>
+ </tbody>
+</table>
+
### Unsupported Functions
The following functions are not supported yet:
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index f5a2059..498dbbc 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1610,7 +1610,7 @@ rowInterval = composite , "." , "rows" ;
cast = composite , ".cast(" , dataType , ")" ;
-dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
+dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "MAP" , "(" , dataType , "," , dataType , ")" ) | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
as = composite , ".as(" , fieldReference , ")" ;
@@ -2950,6 +2950,52 @@ ARRAY.element()
<table class="table table-bordered">
<thead>
<tr>
+ <th class="text-left" style="width: 40%">Map functions</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+
+ <tr>
+ <td>
+ {% highlight java %}
+map(ANY, ANY [, ANY, ANY ]*)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Creates a map from a list of key-value pairs.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+MAP.cardinality()
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the number of entries of a map.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
+MAP.at(ANY)
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Returns the value specified by a particular key in a map.</p>
+ </td>
+ </tr>
+
+ </tbody>
+</table>
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
<th class="text-left" style="width: 40%">Auxiliary functions</th>
<th class="text-center">Description</th>
</tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index b62e142..72a5561 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -683,19 +683,19 @@ trait ImplicitExpressionOperations {
def flatten() = Flattening(expr)
/**
- * Accesses the element of an array based on an index (starting at 1).
+ * Accesses the element of an array or map based on a key or an index (starting at 1).
*
- * @param index position of the element (starting at 1)
+ * @param index key or position of the element (array index starting at 1)
* @return value of the element
*/
- def at(index: Expression) = ArrayElementAt(expr, index)
+ def at(index: Expression) = ItemAt(expr, index)
/**
- * Returns the number of elements of an array.
+ * Returns the number of elements of an array or number of entries of a map.
*
- * @return number of elements
+ * @return number of elements or entries
*/
- def cardinality() = ArrayCardinality(expr)
+ def cardinality() = Cardinality(expr)
/**
* Returns the sole element of an array with a single element. Returns null if the array is
@@ -960,6 +960,19 @@ object array {
}
/**
+ * Creates a map of literals. The map will be a map between two objects (not primitives).
+ */
+object map {
+
+ /**
+ * Creates a map of literals. The map will be a map between two objects (not primitives).
+ */
+ def apply(key: Expression, value: Expression, tail: Expression*): Expression = {
+ MapConstructor(Seq(key, value) ++ tail.toSeq)
+ }
+}
+
+/**
* Returns a value that is closer than any other value to pi.
*/
object pi {
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 7bcdc0f..448029b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -222,6 +222,17 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
canonize(relType)
}
+ override def createMapType(keyType: RelDataType, valueType: RelDataType): RelDataType = {
+ val relType = new MapRelDataType(
+ new MapTypeInfo(
+ FlinkTypeFactory.toTypeInfo(keyType),
+ FlinkTypeFactory.toTypeInfo(valueType)),
+ keyType,
+ valueType,
+ isNullable = false)
+ this.canonize(relType)
+ }
+
override def createMultisetType(elementType: RelDataType, maxCardinality: Long): RelDataType = {
val relType = new MultisetRelDataType(
MultisetTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index 161f9a3..d4ba902 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -198,6 +198,11 @@ object CodeGenUtils {
throw new CodeGenException("Array expression type expected.")
}
+ def requireMap(genExpr: GeneratedExpression): Unit =
+ if (!TypeCheckUtils.isMap(genExpr.resultType)) {
+ throw new CodeGenException("Map expression type expected.")
+ }
+
def requireInteger(genExpr: GeneratedExpression): Unit =
if (!TypeCheckUtils.isInteger(genExpr.resultType)) {
throw new CodeGenException("Integer expression type expected.")
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index a794b08..b51cdbe 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -957,6 +957,10 @@ abstract class CodeGenerator(
case ARRAY_VALUE_CONSTRUCTOR =>
generateArray(this, resultType, operands)
+ // maps
+ case MAP_VALUE_CONSTRUCTOR =>
+ generateMap(this, resultType, operands)
+
case ITEM =>
operands.head.resultType match {
case _: ObjectArrayTypeInfo[_, _] |
@@ -975,9 +979,21 @@ abstract class CodeGenerator(
}
case CARDINALITY =>
- val array = operands.head
- requireArray(array)
- generateArrayCardinality(nullCheck, array)
+ operands.head.resultType match {
+ case _: ObjectArrayTypeInfo[_, _] |
+ _: BasicArrayTypeInfo[_, _] |
+ _: PrimitiveArrayTypeInfo[_] =>
+ val array = operands.head
+ requireArray(array)
+ generateArrayCardinality(nullCheck, array)
+
+ case _: MapTypeInfo[_, _] =>
+ val map = operands.head
+ requireMap(map)
+ generateMapCardinality(nullCheck, map)
+
+ case _ => throw new CodeGenException("Expect an array or a map.")
+ }
case ELEMENT =>
val array = operands.head
@@ -1562,6 +1578,22 @@ abstract class CodeGenerator(
}
/**
+ * Adds a reusable hash map to the member area of the generated [[Function]].
+ */
+ def addReusableMap(clazz: Class[_]): String = {
+ val fieldTerm = newName("map")
+ val classQualifier = "java.util.Map"
+ val initMap = "java.util.HashMap()"
+ val fieldMap =
+ s"""
+ |final $classQualifier $fieldTerm =
+ | new $initMap;
+ |""".stripMargin
+ reusableMemberStatements.add(fieldMap)
+ fieldTerm
+ }
+
+ /**
* Adds a reusable timestamp to the beginning of the SAM of the generated [[Function]].
*/
def addReusableTimestamp(): String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index bd5b1f7..522d826 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -188,6 +188,13 @@ object ScalarOperators {
(leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)"
}
}
+ // map types
+ else if (isMap(left.resultType) &&
+ left.resultType.getTypeClass == right.resultType.getTypeClass) {
+ generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+ (leftTerm, rightTerm) => s"java.util.Map.equals($leftTerm, $rightTerm)"
+ }
+ }
// comparable types of same type
else if (isComparable(left.resultType) && left.resultType == right.resultType) {
generateComparison("==", nullCheck, left, right)
@@ -229,6 +236,13 @@ object ScalarOperators {
(leftTerm, rightTerm) => s"!java.util.Arrays.equals($leftTerm, $rightTerm)"
}
}
+ // map types
+ else if (isMap(left.resultType) &&
+ left.resultType.getTypeClass == right.resultType.getTypeClass) {
+ generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+ (leftTerm, rightTerm) => s"!java.util.Map.equals($leftTerm, $rightTerm)"
+ }
+ }
// comparable types
else if (isComparable(left.resultType) && left.resultType == right.resultType) {
generateComparison("!=", nullCheck, left, right)
@@ -1061,6 +1075,47 @@ object ScalarOperators {
GeneratedExpression(resultTerm, nullTerm, operatorCode, Types.STRING)
}
+ def generateMap(
+ codeGenerator: CodeGenerator,
+ resultType: TypeInformation[_],
+ elements: Seq[GeneratedExpression])
+ : GeneratedExpression = {
+ val mapTerm = codeGenerator.addReusableMap(resultType.getTypeClass)
+
+ val boxedElements: Seq[GeneratedExpression] = resultType match {
+ case mti: MapTypeInfo[_, _] =>
+ // we box the elements to also represent null values
+ val boxedKeyTypeTerm = boxedTypeTermForTypeInfo(mti.getKeyTypeInfo)
+ val boxedValueTypeTerm = boxedTypeTermForTypeInfo(mti.getValueTypeInfo)
+
+ elements.zipWithIndex.map { case (element, idx) =>
+ val boxedExpr = codeGenerator.generateOutputFieldBoxing(element)
+ val exprOrNull: String = if (codeGenerator.nullCheck) {
+ if (idx % 2 == 0) {
+ s"${boxedExpr.nullTerm} ? null : ($boxedKeyTypeTerm) ${boxedExpr.resultTerm}"
+ } else {
+ s"${boxedExpr.nullTerm} ? null : ($boxedValueTypeTerm) ${boxedExpr.resultTerm}"
+ }
+ } else {
+ boxedExpr.resultTerm
+ }
+ boxedExpr.copy(resultTerm = exprOrNull)
+ }
+ }
+
+ val code = boxedElements.grouped(2)
+ .map { case Seq(key, value) =>
+ s"""
+ |${key.code}
+ |${value.code}
+ |$mapTerm.put(${key.resultTerm}, ${value.resultTerm});
+ |""".stripMargin
+ }
+ .mkString("\n")
+
+ GeneratedExpression(mapTerm, GeneratedExpression.NEVER_NULL, code, resultType)
+ }
+
def generateMapGet(
codeGenerator: CodeGenerator,
map: GeneratedExpression,
@@ -1091,6 +1146,15 @@ object ScalarOperators {
GeneratedExpression(resultTerm, nullTerm, accessCode, resultType)
}
+ def generateMapCardinality(
+ nullCheck: Boolean,
+ map: GeneratedExpression)
+ : GeneratedExpression = {
+ generateUnaryOperatorIfNotNull(nullCheck, INT_TYPE_INFO, map) {
+ (operandTerm) => s"${map.resultTerm}.size"
+ }
+ }
+
// ----------------------------------------------------------------------------------------------
private def generateUnaryOperatorIfNotNull(
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index 201679b..aa82464 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -86,6 +86,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val FALSE: Keyword = Keyword("false")
lazy val PRIMITIVE_ARRAY: Keyword = Keyword("PRIMITIVE_ARRAY")
lazy val OBJECT_ARRAY: Keyword = Keyword("OBJECT_ARRAY")
+ lazy val MAP: Keyword = Keyword("MAP")
lazy val BYTE: Keyword = Keyword("BYTE")
lazy val SHORT: Keyword = Keyword("SHORT")
lazy val INTERVAL_MONTHS: Keyword = Keyword("INTERVAL_MONTHS")
@@ -141,6 +142,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val dataType: PackratParser[TypeInformation[_]] =
PRIMITIVE_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => Types.PRIMITIVE_ARRAY(ct) } |
OBJECT_ARRAY ~ "(" ~> dataType <~ ")" ^^ { ct => Types.OBJECT_ARRAY(ct) } |
+ MAP ~ "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ { mt => Types.MAP(mt._1._1, mt._2)} |
BYTE ^^ { e => Types.BYTE } |
SHORT ^^ { e => Types.SHORT } |
INTERVAL_MONTHS ^^ { e => Types.INTERVAL_MONTHS } |
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
index 08abc8f..3b52ab4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -138,6 +138,10 @@ object ExpressionUtils {
}
}
+ private[flink] def convertMap(map: Map[Expression, Expression]): Expression = {
+ MapConstructor(map.flatMap(entry => Seq(entry._1, entry._2)).toSeq)
+ }
+
// ----------------------------------------------------------------------------------------------
// RexNode conversion functions (see org.apache.calcite.sql2rel.StandardConvertletTable)
// ----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
index 3288478..c43bddd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
@@ -62,66 +62,6 @@ case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
}
}
-case class ArrayElementAt(array: Expression, index: Expression) extends Expression {
-
- override private[flink] def children: Seq[Expression] = Seq(array, index)
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder
- .getRexBuilder
- .makeCall(SqlStdOperatorTable.ITEM, array.toRexNode, index.toRexNode)
- }
-
- override def toString = s"($array).at($index)"
-
- override private[flink] def resultType = array.resultType match {
- case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
- case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
- case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
- }
-
- override private[flink] def validateInput(): ValidationResult = {
- array.resultType match {
- case ati: TypeInformation[_] if isArray(ati) =>
- if (index.resultType == INT_TYPE_INFO) {
- // check for common user mistake
- index match {
- case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
- ValidationFailure(
- s"Array element access needs an index starting at 1 but was $value.")
- case _ => ValidationSuccess
- }
- } else {
- ValidationFailure(
- s"Array element access needs an integer index but was '${index.resultType}'.")
- }
- case other@_ => ValidationFailure(s"Array expected but was '$other'.")
- }
- }
-}
-
-case class ArrayCardinality(array: Expression) extends Expression {
-
- override private[flink] def children: Seq[Expression] = Seq(array)
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder
- .getRexBuilder
- .makeCall(SqlStdOperatorTable.CARDINALITY, array.toRexNode)
- }
-
- override def toString = s"($array).cardinality()"
-
- override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- array.resultType match {
- case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
- case other@_ => ValidationFailure(s"Array expected but was '$other'.")
- }
- }
-}
-
case class ArrayElement(array: Expression) extends Expression {
override private[flink] def children: Seq[Expression] = Seq(array)
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
new file mode 100644
index 0000000..aaf52b0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isMap}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+case class Cardinality(container: Expression) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = Seq(container)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(SqlStdOperatorTable.CARDINALITY, container.toRexNode)
+ }
+
+ override def toString = s"($container).cardinality()"
+
+ override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ container.resultType match {
+ case mti: TypeInformation[_] if isMap(mti) => ValidationSuccess
+ case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
+ case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
new file mode 100644
index 0000000..75a1224
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
+import org.apache.flink.table.typeutils.TypeCheckUtils.isArray
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+case class ItemAt(container: Expression, key: Expression) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = Seq(container, key)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(SqlStdOperatorTable.ITEM, container.toRexNode, key.toRexNode)
+ }
+
+ override def toString = s"($container).at($key)"
+
+ override private[flink] def resultType = container.resultType match {
+ case mti: MapTypeInfo[_, _] => mti.getValueTypeInfo
+ case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+ case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
+ case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ container.resultType match {
+ case ati: TypeInformation[_] if isArray(ati) =>
+ if (key.resultType == INT_TYPE_INFO) {
+ // check for common user mistake
+ key match {
+ case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
+ ValidationFailure(
+ s"Array element access needs an index starting at 1 but was $value.")
+ case _ => ValidationSuccess
+ }
+ } else {
+ ValidationFailure(
+ s"Array element access needs an integer index but was '${key.resultType}'.")
+ }
+ case mti: MapTypeInfo[_, _] =>
+ if (key.resultType == mti.getKeyTypeInfo) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(
+ s"Map key-value access needs a valid key of type " +
+ s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.")
+ }
+ case other@_ => ValidationFailure(s"Array or map expected but was '$other'.")
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
new file mode 100644
index 0000000..bf71401
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo}
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.plan.schema.MapRelDataType
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class MapConstructor(elements: Seq[Expression]) extends Expression {
+ override private[flink] def children: Seq[Expression] = elements
+
+ private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
+ new GenericTypeInfo[AnyRef](classOf[AnyRef]),
+ new GenericTypeInfo[AnyRef](classOf[AnyRef]))
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+ val relDataType = typeFactory.createMapType(
+ typeFactory.createTypeFromTypeInfo(elements.head.resultType, isNullable = true),
+ typeFactory.createTypeFromTypeInfo(elements.last.resultType, isNullable = true)
+ )
+ val values = elements.map(_.toRexNode).toList.asJava
+ relBuilder
+ .getRexBuilder
+ .makeCall(relDataType, SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, values)
+ }
+
+ override def toString = s"map(${elements
+ .grouped(2)
+ .map(x => s"[${x.mkString(": ")}]").mkString(", ")})"
+
+ override private[flink] def resultType: TypeInformation[_] = new MapTypeInfo(
+ elements.head.resultType,
+ elements.last.resultType
+ )
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (elements.isEmpty) {
+ return ValidationFailure("Empty maps are not supported yet.")
+ }
+ if (elements.size % 2 != 0) {
+ return ValidationFailure("Maps must have even number of elements to form key value pairs.")
+ }
+ if (!elements.grouped(2).forall(_.head.resultType == elements.head.resultType)) {
+ return ValidationFailure("Not all key elements of the map literal have the same type.")
+ }
+ if (!elements.grouped(2).forall(_.last.resultType == elements.last.resultType)) {
+ return ValidationFailure("Not all value elements of the map literal have the same type.")
+ }
+ ValidationSuccess
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
index dfb44b1..97cd9cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
@@ -174,6 +174,13 @@ object ProjectionTranslator {
replaceAggregationsAndProperties(exp, tableEnv, aggNames, propNames, projectedNames))
c.makeCopy(Array(newArgs))
+ // map constructor
+ case c @ MapConstructor(args) =>
+ val newArgs = c.elements
+ .map((exp: Expression) =>
+ replaceAggregationsAndProperties(exp, tableEnv, aggNames, propNames, projectedNames))
+ c.makeCopy(Array(newArgs))
+
// General expression
case e: Expression =>
val newArgs = e.productIterator.map {
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 00627ad..fa7abab 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.typeutils
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo._
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo.{INTERVAL_MILLIS, INTERVAL_MONTHS}
import org.apache.flink.table.validate._
@@ -79,6 +79,9 @@ object TypeCheckUtils {
case _ => false
}
+ def isMap(dataType: TypeInformation[_]): Boolean =
+ dataType.isInstanceOf[MapTypeInfo[_, _]]
+
def isComparable(dataType: TypeInformation[_]): Boolean =
classOf[Comparable[_]].isAssignableFrom(dataType.getTypeClass) && !isArray(dataType)
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 535fd6e..120bf54 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -237,12 +237,19 @@ object FunctionCatalog {
"dateTimePlus" -> classOf[Plus],
"dateFormat" -> classOf[DateFormat],
+ // item
+ "at" -> classOf[ItemAt],
+
+ // cardinality
+ "cardinality" -> classOf[Cardinality],
+
// array
"array" -> classOf[ArrayConstructor],
- "cardinality" -> classOf[ArrayCardinality],
- "at" -> classOf[ArrayElementAt],
"element" -> classOf[ArrayElement],
+ // map
+ "map" -> classOf[MapConstructor],
+
// window properties
"start" -> classOf[WindowStart],
"end" -> classOf[WindowEnd],
@@ -330,9 +337,12 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.VAR_SAMP,
// ARRAY OPERATORS
SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR,
+ SqlStdOperatorTable.ELEMENT,
+ // MAP OPERATORS
+ SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR,
+ // ARRAY MAP SHARED OPERATORS
SqlStdOperatorTable.ITEM,
SqlStdOperatorTable.CARDINALITY,
- SqlStdOperatorTable.ELEMENT,
// SPECIAL OPERATORS
SqlStdOperatorTable.ROW,
SqlStdOperatorTable.OVERLAPS,
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
index 536f6ba..28023dd 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
@@ -295,4 +295,13 @@ class ArrayTypeTest extends ArrayTypeTestBase {
"f11 <> f9",
"false")
}
+
+ @Test
+ def testArrayTypeCasting(): Unit = {
+ testTableApi(
+ 'f3.cast(Types.OBJECT_ARRAY(Types.SQL_DATE)),
+ "f3.cast(OBJECT_ARRAY(SQL_DATE))",
+ "[1984-03-12, 1984-02-10]"
+ )
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
index b2b8016..a2f2a0b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
@@ -18,17 +18,178 @@
package org.apache.flink.table.expressions
+import java.sql.Date
+
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.utils.MapTypeTestBase
import org.junit.Test
class MapTypeTest extends MapTypeTestBase {
@Test
- def testItem(): Unit = {
- testSqlApi("f0['map is null']", "null")
- testSqlApi("f1['map is empty']", "null")
- testSqlApi("f2['b']", "13")
- testSqlApi("f3[1]", "null")
- testSqlApi("f3[12]", "a")
+ def testMapLiteral(): Unit = {
+ // primitive literals
+ testAllApis(map(1, 1), "map(1, 1)", "MAP[1, 1]", "{1=1}")
+
+ testAllApis(
+ map(true, true),
+ "map(true, true)",
+ "map[TRUE, TRUE]",
+ "{true=true}")
+
+ // object literals
+ testTableApi(map(BigDecimal(1), BigDecimal(1)), "map(1p, 1p)", "{1=1}")
+
+ testAllApis(
+ map(map(1, 2), map(3, 4)),
+ "map(map(1, 2), map(3, 4))",
+ "MAP[MAP[1, 2], MAP[3, 4]]",
+ "{{1=2}={3=4}}")
+
+ testAllApis(
+ map(1 + 2, 3 * 3, 6 / 3, 4 - 2),
+ "map(1 + 2, 3 * 3, 6 / 3, 4 - 2)",
+ "map[1 + 2, 3 * 3, 6 / 3, 4 - 2]",
+ "{2=2, 3=9}")
+
+ testAllApis(
+ map(1, Null(Types.INT)),
+ "map(1, Null(INT))",
+ "map[1, NULLIF(1,1)]",
+ "{1=null}")
+
+ // explicit conversion
+ testAllApis(
+ map(1, 2L , 3, 4L),
+ "map(1, 2L, 3, 4L)",
+ "MAP[1, CAST(2 AS BIGINT), 3, CAST(4 AS BIGINT)]",
+ "{1=2, 3=4}")
+
+ testAllApis(
+ map(Date.valueOf("1985-04-11"), 1),
+ "map('1985-04-11'.toDate, 1)",
+ "MAP[DATE '1985-04-11', 1]",
+ "{1985-04-11=1}")
+
+ testAllApis(
+ map(BigDecimal(2.0002), BigDecimal(2.0003)),
+ "map(2.0002p, 2.0003p)",
+ "MAP[CAST(2.0002 AS DECIMAL), CAST(2.0003 AS DECIMAL)]",
+ "{2.0002=2.0003}")
+ }
+
+ @Test
+ def testMapField(): Unit = {
+ testAllApis(
+ map('f4, 'f5),
+ "map(f4, f5)",
+ "MAP[f4, f5]",
+ "{foo=12}")
+
+ testAllApis(
+ map('f4, 'f1),
+ "map(f4, f1)",
+ "MAP[f4, f1]",
+ "{foo={}}")
+
+ testAllApis(
+ map('f2, 'f3),
+ "map(f2, f3)",
+ "MAP[f2, f3]",
+ "{{a=12, b=13}={12=a, 13=b}}")
+
+ testAllApis(
+ map('f1.at("a"), 'f5),
+ "map(f1.at('a'), f5)",
+ "MAP[f1['a'], f5]",
+ "{null=12}")
+
+ testAllApis(
+ 'f1,
+ "f1",
+ "f1",
+ "{}")
+
+ testAllApis(
+ 'f2,
+ "f2",
+ "f2",
+ "{a=12, b=13}")
+
+ testAllApis(
+ 'f2.at("a"),
+ "f2.at('a')",
+ "f2['a']",
+ "12")
+
+ testAllApis(
+ 'f3.at(12),
+ "f3.at(12)",
+ "f3[12]",
+ "a")
+
+ testAllApis(
+ map('f4, 'f3).at("foo").at(13),
+ "map(f4, f3).at('foo').at(13)",
+ "MAP[f4, f3]['foo'][13]",
+ "b")
+ }
+
+ @Test
+ def testMapOperations(): Unit = {
+
+ // comparison
+ testAllApis(
+ 'f5 === 'f2.at("a"),
+ "f5 === f2.at('a')",
+ "f5 = f2['a']",
+ "true")
+
+ // comparison
+ testAllApis(
+ 'f5 === 'f2.at("a"),
+ "f5 === f2.at('a')",
+ "f5 = f2['a']",
+ "true")
+
+ testAllApis(
+ 'f0.at("map is null"),
+ "f0.at('map is null')",
+ "f0['map is null']",
+ "null")
+
+ testAllApis(
+ 'f1.at("map is empty"),
+ "f1.at('map is empty')",
+ "f1['map is empty']",
+ "null")
+
+ testAllApis(
+ 'f2.at("b"),
+ "f2.at('b')",
+ "f2['b']",
+ "13")
+
+ testAllApis(
+ 'f3.at(1),
+ "f3.at(1)",
+ "f3[1]",
+ "null")
+
+ testAllApis(
+ 'f3.at(12),
+ "f3.at(12)",
+ "f3[12]",
+ "a")
+ }
+
+ @Test
+ def testMapTypeCasting(): Unit = {
+ testTableApi(
+ 'f2.cast(Types.MAP(Types.STRING, Types.INT)),
+ "f2.cast(MAP(STRING, INT))",
+ "{a=12, b=13}"
+ )
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index c631212..7bdfee0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -156,6 +156,8 @@ class SqlExpressionTest extends ExpressionTestBase {
// testSqlApi("('hello world', 12)", "hello world") // test base only returns field 0
testSqlApi("ARRAY[TRUE, FALSE][2]", "false")
testSqlApi("ARRAY[TRUE, TRUE]", "[true, true]")
+ testSqlApi("MAP['k1', 'v1', 'k2', 'v2']['k2']", "v2")
+ testSqlApi("MAP['k1', CAST(true AS VARCHAR(256)), 'k2', 'foo']['k1']", "true")
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
index d90d9df..a5df7ec 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
@@ -22,22 +22,26 @@ import java.util.{HashMap => JHashMap}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{MapTypeInfo, RowTypeInfo}
+import org.apache.flink.table.api.Types
import org.apache.flink.types.Row
class MapTypeTestBase extends ExpressionTestBase {
override def testData: Any = {
- val testData = new Row(4)
- testData.setField(0, null)
- testData.setField(1, new JHashMap[String, Int]())
- val map = new JHashMap[String, Int]()
- map.put("a", 12)
- map.put("b", 13)
- testData.setField(2, map)
+ val map1 = new JHashMap[String, Int]()
+ map1.put("a", 12)
+ map1.put("b", 13)
val map2 = new JHashMap[Int, String]()
map2.put(12, "a")
map2.put(13, "b")
+ val testData = new Row(7)
+ testData.setField(0, null)
+ testData.setField(1, new JHashMap[String, Int]())
+ testData.setField(2, map1)
testData.setField(3, map2)
+ testData.setField(4, "foo")
+ testData.setField(5, 12)
+ testData.setField(6, Array(1.2, 1.3))
testData
}
@@ -46,7 +50,10 @@ class MapTypeTestBase extends ExpressionTestBase {
new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
- new MapTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
+ new MapTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+ Types.STRING,
+ Types.INT,
+ Types.OBJECT_ARRAY(Types.DOUBLE)
).asInstanceOf[TypeInformation[Any]]
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c5f5615c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala
index 3f8306d..ae85b0d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/MapTypeValidationTest.scala
@@ -26,6 +26,6 @@ class MapTypeValidationTest extends MapTypeTestBase {
@Test(expected = classOf[ValidationException])
def testWrongKeyType(): Unit = {
- testSqlApi("f4[12]", "FAIL")
+ testSqlApi("f2[12]", "FAIL")
}
}