You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/05 06:44:11 UTC
[flink] branch master updated: [FLINK-12796][table-planner-blink]
Introduce BaseArray and BaseMap to reduce conversion overhead
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 906616b [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead
906616b is described below
commit 906616baeaf482662d5ba621c14e513912977c5e
Author: Jingsong Lee <lz...@aliyun.com>
AuthorDate: Fri Jul 5 14:43:59 2019 +0800
[FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead
This closes #8682
---
.../apache/flink/table/codegen/CodeGenUtils.scala | 20 +-
.../apache/flink/table/codegen/GenerateUtils.scala | 2 +-
.../table/codegen/calls/ScalarOperatorGens.scala | 208 +++++++++++----
.../flink/table/codegen/SortCodeGeneratorTest.java | 4 +-
.../table/runtime/utils/RangeInputFormat.java | 67 +++++
.../apache/flink/table/util/BaseRowTestUtil.java | 5 +-
.../runtime/batch/sql/agg/SortAggITCase.scala | 249 ++++++++++++++++-
.../table/runtime/utils/BatchTableEnvUtil.scala | 2 +-
.../flink/table/runtime/utils/BatchTestBase.scala | 24 +-
.../table/dataformat/AbstractBinaryWriter.java | 17 +-
.../apache/flink/table/dataformat/BaseArray.java | 65 +++++
.../org/apache/flink/table/dataformat/BaseMap.java | 53 ++++
.../apache/flink/table/dataformat/BinaryArray.java | 35 ++-
.../flink/table/dataformat/BinaryArrayWriter.java | 4 +
.../apache/flink/table/dataformat/BinaryMap.java | 19 +-
.../apache/flink/table/dataformat/BinaryRow.java | 4 +-
.../flink/table/dataformat/BinaryWriter.java | 21 +-
.../apache/flink/table/dataformat/ColumnarRow.java | 4 +-
.../table/dataformat/DataFormatConverters.java | 186 +++++++++----
.../flink/table/dataformat/GenericArray.java | 295 +++++++++++++++++++++
.../apache/flink/table/dataformat/GenericMap.java} | 44 +--
.../apache/flink/table/dataformat/JoinedRow.java | 4 +-
.../apache/flink/table/dataformat/NestedRow.java | 4 +-
.../flink/table/dataformat/ObjectArrayRow.java | 8 +-
.../flink/table/dataformat/TypeGetterSetters.java | 8 +-
.../flink/table/dataformat/UpdatableRow.java | 6 +-
.../table/runtime/hashtable/BinaryHashTable.java | 4 +-
.../table/runtime/sort/StreamSortOperator.java | 2 +-
.../table/types/ClassLogicalTypeConverter.java | 8 +-
.../flink/table/types/InternalSerializers.java | 14 +-
.../table/types/TypeInfoDataTypeConverter.java | 4 +-
.../table/typeutils/AbstractRowSerializer.java | 2 +-
.../flink/table/typeutils/BaseArraySerializer.java | 280 +++++++++++++++++++
.../flink/table/typeutils/BaseMapSerializer.java | 285 ++++++++++++++++++++
.../flink/table/typeutils/BaseRowSerializer.java | 47 ++--
.../table/typeutils/BinaryArraySerializer.java | 107 --------
.../flink/table/typeutils/BinaryMapSerializer.java | 107 --------
.../flink/table/typeutils/BinaryRowSerializer.java | 2 +-
.../apache/flink/table/dataformat/BaseRowTest.java | 10 +-
.../flink/table/dataformat/BinaryArrayTest.java | 25 +-
.../flink/table/dataformat/BinaryRowTest.java | 4 +-
.../table/runtime/util/BinaryRowKeySelector.java | 10 +-
...lizerTest.java => BaseArraySerializerTest.java} | 48 +++-
.../table/typeutils/BaseMapSerializerTest.java | 99 +++++++
.../table/typeutils/BaseRowSerializerTest.java | 30 ++-
.../typeutils/BinaryGenericSerializerTest.java | 2 +-
.../table/typeutils/BinaryMapSerializerTest.java | 66 -----
.../table/typeutils/BinaryRowSerializerTest.java | 2 +-
.../table/typeutils/DecimalSerializerTest.java | 2 +-
49 files changed, 1972 insertions(+), 546 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index 1670a95..e324c12 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -66,12 +66,16 @@ object CodeGenUtils {
val BINARY_ARRAY: String = className[BinaryArray]
+ val BASE_ARRAY: String = className[BaseArray]
+
val BINARY_GENERIC: String = className[BinaryGeneric[_]]
val BINARY_STRING: String = className[BinaryString]
val BINARY_MAP: String = className[BinaryMap]
+ val BASE_MAP: String = className[BaseMap]
+
val BASE_ROW: String = className[BaseRow]
val JOINED_ROW: String = className[JoinedRow]
@@ -152,8 +156,8 @@ object CodeGenUtils {
case VARBINARY | BINARY => "byte[]"
case DECIMAL => className[Decimal]
- case ARRAY => className[BinaryArray]
- case MULTISET | MAP => className[BinaryMap]
+ case ARRAY => className[BaseArray]
+ case MULTISET | MAP => className[BaseMap]
case ROW => className[BaseRow]
case ANY => className[BinaryGeneric[_]]
@@ -631,11 +635,15 @@ object CodeGenUtils {
case INTERVAL_DAY_TIME => s"$writerTerm.writeLong($indexTerm, $fieldValTerm)"
// complex types
- case ARRAY => s"$writerTerm.writeArray($indexTerm, $fieldValTerm)"
- case MULTISET | MAP => s"$writerTerm.writeMap($indexTerm, $fieldValTerm)"
+ case ARRAY =>
+ val ser = ctx.addReusableTypeSerializer(t)
+ s"$writerTerm.writeArray($indexTerm, $fieldValTerm, $ser)"
+ case MULTISET | MAP =>
+ val ser = ctx.addReusableTypeSerializer(t)
+ s"$writerTerm.writeMap($indexTerm, $fieldValTerm, $ser)"
case ROW =>
- val typeTerm = ctx.addReusableObject(t, "rowType")
- s"$writerTerm.writeRow($indexTerm, $fieldValTerm, $typeTerm)"
+ val ser = ctx.addReusableTypeSerializer(t)
+ s"$writerTerm.writeRow($indexTerm, $fieldValTerm, $ser)"
case ANY => s"$writerTerm.writeGeneric($indexTerm, $fieldValTerm)"
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
index 4b39c05..70db90a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
@@ -669,7 +669,7 @@ object GenerateUtils {
SortUtil.getNullDefaultOrder(true), at, "a", "b")
val funcCode: String =
s"""
- public int $compareFunc($BINARY_ARRAY a, $BINARY_ARRAY b) {
+ public int $compareFunc($BASE_ARRAY a, $BASE_ARRAY b) {
$compareCode
return 0;
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
index 95b785d..4e8f282 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
@@ -1495,7 +1495,7 @@ object ScalarOperatorGens {
checkArgument(resultType.isInstanceOf[MapType])
val mapType = resultType.asInstanceOf[MapType]
- val mapTerm = newName("map")
+ val baseMap = newName("map")
// prepare map key array
val keyElements = elements.grouped(2).map { case Seq(key, _) => key }.toSeq
@@ -1510,11 +1510,14 @@ object ScalarOperatorGens {
val isValueFixLength = isPrimitive(valueType)
// construct binary map
- ctx.addReusableMember(s"$BINARY_MAP $mapTerm = null;")
+ ctx.addReusableMember(s"$BASE_MAP $baseMap = null;")
val code = if (isKeyFixLength && isValueFixLength) {
+ val binaryMap = newName("binaryMap")
+ ctx.addReusableMember(s"$BINARY_MAP $binaryMap = null;")
// the key and value are fixed length, initialize and reuse the map in constructor
- val init = s"$mapTerm = $BINARY_MAP.valueOf(${keyExpr.resultTerm}, ${valueExpr.resultTerm});"
+ val init =
+ s"$binaryMap = $BINARY_MAP.valueOf(${keyExpr.resultTerm}, ${valueExpr.resultTerm});"
ctx.addReusableInitStatement(init)
// there are some non-literal primitive fields need to update
val keyArrayTerm = newName("keyArray")
@@ -1524,20 +1527,21 @@ object ScalarOperatorGens {
val valueUpdate = generatePrimitiveArrayUpdateCode(
ctx, valueArrayTerm, valueType, valueElements)
s"""
- |$BINARY_ARRAY $keyArrayTerm = $mapTerm.keyArray();
+ |$BINARY_ARRAY $keyArrayTerm = $binaryMap.keyArray();
|$keyUpdate
- |$BINARY_ARRAY $valueArrayTerm = $mapTerm.valueArray();
+ |$BINARY_ARRAY $valueArrayTerm = $binaryMap.valueArray();
|$valueUpdate
+ |$baseMap = $binaryMap;
""".stripMargin
} else {
// the key or value is not fixed length, re-create the map on every update
s"""
|${keyExpr.code}
|${valueExpr.code}
- |$mapTerm = $BINARY_MAP.valueOf(${keyExpr.resultTerm}, ${valueExpr.resultTerm});
+ |$baseMap = $BINARY_MAP.valueOf(${keyExpr.resultTerm}, ${valueExpr.resultTerm});
""".stripMargin
}
- GeneratedExpression(mapTerm, NEVER_NULL, code, resultType)
+ GeneratedExpression(baseMap, NEVER_NULL, code, resultType)
}
def generateMapGet(
@@ -1551,6 +1555,7 @@ object ScalarOperatorGens {
val values = newName("values")
val index = newName("index")
val found = newName("found")
+ val tmpValue = newName("value")
val mapType = map.resultType.asInstanceOf[MapType]
val keyType = mapType.getKeyType
@@ -1560,42 +1565,59 @@ object ScalarOperatorGens {
val keyTypeTerm = primitiveTypeTermForType(keyType)
val valueTypeTerm = primitiveTypeTermForType(valueType)
val valueDefault = primitiveDefaultValue(valueType)
+ val binaryMapTypeTerm = classOf[BinaryMap].getCanonicalName
+ val binaryMapTerm = newName("binaryMap")
+ val genericMapTypeTerm = classOf[GenericMap].getCanonicalName
+ val genericMapTerm = newName("genericMap")
+ val boxedValueTypeTerm = boxedTypeTermForType(valueType)
val mapTerm = map.resultTerm
val equal = generateEquals(ctx, key, GeneratedExpression(tmpKey, NEVER_NULL, NO_CODE, keyType))
val code =
s"""
- |final int $length = $mapTerm.numElements();
- |final $BINARY_ARRAY $keys = $mapTerm.keyArray();
- |final $BINARY_ARRAY $values = $mapTerm.valueArray();
+ |if ($mapTerm instanceof $binaryMapTypeTerm) {
+ | $binaryMapTypeTerm $binaryMapTerm = ($binaryMapTypeTerm) $mapTerm;
+ | final int $length = $binaryMapTerm.numElements();
+ | final $BINARY_ARRAY $keys = $binaryMapTerm.keyArray();
+ | final $BINARY_ARRAY $values = $binaryMapTerm.valueArray();
|
- |int $index = 0;
- |boolean $found = false;
- |if (${key.nullTerm}) {
- | while ($index < $length && !$found) {
- | if ($keys.isNullAt($index)) {
- | $found = true;
- | } else {
- | $index++;
+ | int $index = 0;
+ | boolean $found = false;
+ | if (${key.nullTerm}) {
+ | while ($index < $length && !$found) {
+ | if ($keys.isNullAt($index)) {
+ | $found = true;
+ | } else {
+ | $index++;
+ | }
| }
- | }
- |} else {
- | while ($index < $length && !$found) {
- | final $keyTypeTerm $tmpKey = ${baseRowFieldReadAccess(ctx, index, keys, keyType)};
- | ${equal.code}
- | if (${equal.resultTerm}) {
- | $found = true;
- | } else {
- | $index++;
+ | } else {
+ | while ($index < $length && !$found) {
+ | final $keyTypeTerm $tmpKey = ${baseRowFieldReadAccess(ctx, index, keys, keyType)};
+ | ${equal.code}
+ | if (${equal.resultTerm}) {
+ | $found = true;
+ | } else {
+ | $index++;
+ | }
| }
| }
- |}
|
- |if (!$found || $values.isNullAt($index)) {
- | $nullTerm = true;
+ | if (!$found || $values.isNullAt($index)) {
+ | $nullTerm = true;
+ | } else {
+ | $resultTerm = ${baseRowFieldReadAccess(ctx, index, values, valueType)};
+ | }
|} else {
- | $resultTerm = ${baseRowFieldReadAccess(ctx, index, values, valueType)};
+ | $genericMapTypeTerm $genericMapTerm = ($genericMapTypeTerm) $mapTerm;
+ | $boxedValueTypeTerm $tmpValue =
+ | ($boxedValueTypeTerm) $genericMapTerm.get(($keyTypeTerm) ${key.resultTerm});
+ | if ($tmpValue == null) {
+ | $nullTerm = true;
+ | } else {
+ | $resultTerm = $tmpValue;
+ | }
|}
""".stripMargin
@@ -1707,8 +1729,12 @@ object ScalarOperatorGens {
val builderTerm = newName("builder")
ctx.addReusableMember(s"$builderCls $builderTerm = new $builderCls();")
- val binaryMapTerm = terms.head
- val arrayCls = classOf[BinaryArray].getCanonicalName
+ val mapTerm = terms.head
+ val genericMapCls = classOf[GenericMap].getCanonicalName
+ val genericMapTerm = newName("genericMap")
+ val binaryMapCls = classOf[BinaryMap].getCanonicalName
+ val binaryMapTerm = newName("binaryMap")
+ val arrayCls = classOf[BaseArray].getCanonicalName
val keyArrayTerm = newName("keyArray")
val valueArrayTerm = newName("valueArray")
@@ -1750,36 +1776,42 @@ object ScalarOperatorGens {
val stmt =
s"""
|String $resultTerm;
- |$arrayCls $keyArrayTerm = $binaryMapTerm.keyArray();
- |$arrayCls $valueArrayTerm = $binaryMapTerm.valueArray();
+ |if ($mapTerm instanceof $binaryMapCls) {
+ | $binaryMapCls $binaryMapTerm = ($binaryMapCls) $mapTerm;
+ | $arrayCls $keyArrayTerm = $binaryMapTerm.keyArray();
+ | $arrayCls $valueArrayTerm = $binaryMapTerm.valueArray();
|
- |$builderTerm.setLength(0);
- |$builderTerm.append("{");
+ | $builderTerm.setLength(0);
+ | $builderTerm.append("{");
|
- |int $numTerm = $binaryMapTerm.numElements();
- |for (int $indexTerm = 0; $indexTerm < $numTerm; $indexTerm++) {
- | if ($indexTerm != 0) {
- | $builderTerm.append(", ");
- | }
+ | int $numTerm = $binaryMapTerm.numElements();
+ | for (int $indexTerm = 0; $indexTerm < $numTerm; $indexTerm++) {
+ | if ($indexTerm != 0) {
+ | $builderTerm.append(", ");
+ | }
|
- | ${keyCastExpr.code}
- | if (${keyCastExpr.nullTerm}) {
- | $builderTerm.append("null");
- | } else {
- | $builderTerm.append(${keyCastExpr.resultTerm});
- | }
- | $builderTerm.append("=");
+ | ${keyCastExpr.code}
+ | if (${keyCastExpr.nullTerm}) {
+ | $builderTerm.append("null");
+ | } else {
+ | $builderTerm.append(${keyCastExpr.resultTerm});
+ | }
+ | $builderTerm.append("=");
|
- | ${valueCastExpr.code}
- | if (${valueCastExpr.nullTerm}) {
- | $builderTerm.append("null");
- | } else {
- | $builderTerm.append(${valueCastExpr.resultTerm});
+ | ${valueCastExpr.code}
+ | if (${valueCastExpr.nullTerm}) {
+ | $builderTerm.append("null");
+ | } else {
+ | $builderTerm.append(${valueCastExpr.resultTerm});
+ | }
| }
- |}
- |$builderTerm.append("}");
+ | $builderTerm.append("}");
|
- |$resultTerm = $builderTerm.toString();
+ | $resultTerm = $builderTerm.toString();
+ |} else {
+ | $genericMapCls $genericMapTerm = ($genericMapCls) $mapTerm;
+ | $resultTerm = $genericMapTerm.toString();
+ |}
""".stripMargin
(stmt, resultTerm)
}
@@ -1900,8 +1932,68 @@ object ScalarOperatorGens {
args =>
val leftTerm = args.head
val rightTerm = args(1)
+
val resultTerm = newName("compareResult")
- val stmt = s"boolean $resultTerm = $leftTerm.equals($rightTerm);"
+ val binaryMapCls = classOf[BinaryMap].getCanonicalName
+
+ val mapType = left.resultType.asInstanceOf[MapType]
+ val mapCls = classOf[java.util.Map[AnyRef, AnyRef]].getCanonicalName
+ val keyCls = boxedTypeTermForType(mapType.getKeyType)
+ val valueCls = boxedTypeTermForType(mapType.getValueType)
+
+ val leftMapTerm = newName("leftMap")
+ val leftKeyTerm = newName("leftKey")
+ val leftValueTerm = newName("leftValue")
+ val leftValueNullTerm = newName("leftValueIsNull")
+ val leftValueExpr =
+ GeneratedExpression(leftValueTerm, leftValueNullTerm, "", mapType.getValueType)
+
+ val rightMapTerm = newName("rightMap")
+ val rightValueTerm = newName("rightValue")
+ val rightValueNullTerm = newName("rightValueIsNull")
+ val rightValueExpr =
+ GeneratedExpression(rightValueTerm, rightValueNullTerm, "", mapType.getValueType)
+
+ val entryTerm = newName("entry")
+ val entryCls = classOf[java.util.Map.Entry[AnyRef, AnyRef]].getCanonicalName
+ val valueEqualsExpr = generateEquals(ctx, leftValueExpr, rightValueExpr)
+
+ val internalTypeCls = classOf[LogicalType].getCanonicalName
+ val keyTypeTerm =
+ ctx.addReusableObject(mapType.getKeyType, "keyType", internalTypeCls)
+ val valueTypeTerm =
+ ctx.addReusableObject(mapType.getValueType, "valueType", internalTypeCls)
+
+ val stmt =
+ s"""
+ |boolean $resultTerm;
+ |if ($leftTerm.numElements() == $rightTerm.numElements()) {
+ | $resultTerm = true;
+ | $mapCls $leftMapTerm = $leftTerm.toJavaMap($keyTypeTerm, $valueTypeTerm);
+ | $mapCls $rightMapTerm = $rightTerm.toJavaMap($keyTypeTerm, $valueTypeTerm);
+ |
+ | for ($entryCls $entryTerm : $leftMapTerm.entrySet()) {
+ | $keyCls $leftKeyTerm = ($keyCls) $entryTerm.getKey();
+ | if ($rightMapTerm.containsKey($leftKeyTerm)) {
+ | $valueCls $leftValueTerm = ($valueCls) $entryTerm.getValue();
+ | $valueCls $rightValueTerm = ($valueCls) $rightMapTerm.get($leftKeyTerm);
+ | boolean $leftValueNullTerm = ($leftValueTerm == null);
+ | boolean $rightValueNullTerm = ($rightValueTerm == null);
+ |
+ | ${valueEqualsExpr.code}
+ | if (!${valueEqualsExpr.resultTerm}) {
+ | $resultTerm = false;
+ | break;
+ | }
+ | } else {
+ | $resultTerm = false;
+ | break;
+ | }
+ | }
+ |} else {
+ | $resultTerm = false;
+ |}
+ """.stripMargin
(stmt, resultTerm)
}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
index 1526744..890ef80 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.table.generated.RecordComparator;
import org.apache.flink.table.plan.util.SortUtil;
import org.apache.flink.table.runtime.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.InternalSerializers;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
@@ -181,7 +182,8 @@ public class SortCodeGeneratorTest {
if (value == null) {
writer.setNullAt(j);
} else {
- BinaryWriter.write(writer, j, value, types[fields[j]]);
+ BinaryWriter.write(writer, j, value, types[fields[j]],
+ InternalSerializers.create(types[fields[j]], new ExecutionConfig()));
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/runtime/utils/RangeInputFormat.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/runtime/utils/RangeInputFormat.java
new file mode 100644
index 0000000..c642c85
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/runtime/utils/RangeInputFormat.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BoxedWrapperRow;
+
+import java.io.IOException;
+
+/**
+ * An input format that returns objects from a range.
+ */
+public class RangeInputFormat extends GenericInputFormat<BaseRow> implements NonParallelInput {
+
+ private static final long serialVersionUID = 1L;
+
+ private long start;
+ private long end;
+
+ private transient long current;
+ private transient BoxedWrapperRow reuse;
+
+ public RangeInputFormat(long start, long end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return current >= end;
+ }
+
+ @Override
+ public void open(GenericInputSplit split) throws IOException {
+ super.open(split);
+ this.current = start;
+ }
+
+ @Override
+ public BaseRow nextRecord(BaseRow ignore) throws IOException {
+ if (reuse == null) {
+ reuse = new BoxedWrapperRow(1);
+ }
+ reuse.setLong(0, current);
+ current++;
+ return reuse;
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowTestUtil.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowTestUtil.java
index 445e124..3225243 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowTestUtil.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowTestUtil.java
@@ -18,10 +18,12 @@
package org.apache.flink.table.util;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryWriter;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.types.InternalSerializers;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
@@ -99,7 +101,8 @@ public class BaseRowTestUtil {
}
public static void write(BinaryWriter writer, int pos, Object o, LogicalType type) {
- BinaryWriter.write(writer, pos, o, type);
+ BinaryWriter.write(writer, pos, o, type,
+ InternalSerializers.create(type, new ExecutionConfig()));
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala
index a0e7eec..54b5e0d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala
@@ -18,8 +18,8 @@
package org.apache.flink.table.runtime.batch.sql.agg
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{TableConfigOptions, Types}
import org.apache.flink.table.functions.AggregateFunction
@@ -27,8 +27,10 @@ import org.apache.flink.table.plan.util.JavaUserDefinedAggFunctions.WeightedAvgW
import org.apache.flink.table.runtime.utils.BatchTestBase.row
import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils.{MyPojo, MyToPojoFunc}
import org.apache.flink.table.util.{CountAccumulator, CountAggFunction, IntSumAggFunction}
+
import org.junit.{Ignore, Test}
+import java.lang
import java.lang.{Iterable => JIterable}
import scala.annotation.varargs
@@ -47,6 +49,26 @@ class SortAggITCase
registerFunction("countFun", new CountAggFunction())
registerFunction("intSumFun", new IntSumAggFunction())
registerFunction("weightedAvg", new WeightedAvgWithMergeAndReset())
+
+ registerFunction("myPrimitiveArrayUdaf", new MyPrimitiveArrayUdaf())
+ registerFunction("myObjectArrayUdaf", new MyObjectArrayUdaf())
+ registerFunction("myNestedLongArrayUdaf", new MyNestedLongArrayUdaf())
+ registerFunction("myNestedStringArrayUdaf", new MyNestedStringArrayUdaf())
+
+ registerFunction("myPrimitiveMapUdaf", new MyPrimitiveMapUdaf())
+ registerFunction("myObjectMapUdaf", new MyObjectMapUdaf())
+ registerFunction("myNestedMapUdaf", new MyNestedMapUdf())
+ }
+
+ @Test
+ def testBigDataSimpleArrayUDAF(): Unit = {
+ tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1)
+ registerFunction("simplePrimitiveArrayUdaf", new SimplePrimitiveArrayUdaf())
+ registerRange("RangeT", 1000000)
+ env.setParallelism(1)
+ checkResult(
+ "SELECT simplePrimitiveArrayUdaf(id) FROM RangeT",
+ Seq(row(499999500000L)))
}
@Ignore
@@ -241,6 +263,60 @@ class SortAggITCase
)
)
}
+
+ @Test
+ def testArrayUdaf(): Unit = {
+ tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1)
+ env.setParallelism(1)
+ checkResult(
+ "SELECT myPrimitiveArrayUdaf(a, b) FROM Table3",
+ Seq(row(Array(231, 91)))
+ )
+ checkResult(
+ "SELECT myObjectArrayUdaf(c) FROM Table3",
+ Seq(row(Array("HHHHILCCCCCCCCCCCCCCC", "iod?.r123456789012345")))
+ )
+ checkResult(
+ "SELECT myNestedLongArrayUdaf(a, b)[2] FROM Table3",
+ Seq(row(Array(91, 231)))
+ )
+ checkResult(
+ "SELECT myNestedStringArrayUdaf(c)[2] FROM Table3",
+ Seq(row(Array("iod?.r123456789012345", "HHHHILCCCCCCCCCCCCCCC")))
+ )
+ }
+
+ @Test
+ def testMapUdaf(): Unit = {
+ checkResult(
+ "SELECT myPrimitiveMapUdaf(a, b)[3] FROM Table3",
+ Seq(row(15))
+ )
+ checkResult(
+ "SELECT myPrimitiveMapUdaf(a, b)[6] FROM Table3",
+ Seq(row(111))
+ )
+ checkResult(
+ "SELECT myObjectMapUdaf(a, c)['Co'] FROM Table3",
+ Seq(row(210))
+ )
+ checkResult(
+ "SELECT myObjectMapUdaf(a, c)['He'] FROM Table3",
+ Seq(row(9))
+ )
+ checkResult(
+ "SELECT myNestedMapUdaf(a, b, c)[6]['Co'] FROM Table3",
+ Seq(row(111))
+ )
+ checkResult(
+ "SELECT myNestedMapUdaf(a, b, c)[3]['He'] FROM Table3",
+ Seq(row(4))
+ )
+ checkResult(
+ "SELECT myNestedMapUdaf(a, b, c)[3]['Co'] FROM Table3",
+ Seq(row("null"))
+ )
+ }
}
class MyPojoAggFunction extends AggregateFunction[MyPojo, CountAccumulator] {
@@ -320,3 +396,172 @@ class VarArgsAggFunction extends AggregateFunction[Long, CountAccumulator] {
new TupleTypeInfo[CountAccumulator](classOf[CountAccumulator], Types.LONG)
}
}
+
+class SimplePrimitiveArrayUdaf extends AggregateFunction[lang.Long, Array[Long]] {
+
+ var i = 0
+
+ override def createAccumulator(): Array[Long] = new Array[Long](10000)
+
+ override def getValue(accumulator: Array[Long]): lang.Long = Long.box(accumulator.sum)
+
+ def accumulate(accumulator: Array[Long], a: Long): Unit = {
+ accumulator(i) += a
+ i += 1
+ if (i >= accumulator.length) {
+ i = 0
+ }
+ }
+
+ override def getAccumulatorType: TypeInformation[Array[Long]] =
+ PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+
+ override def getResultType: TypeInformation[lang.Long] = Types.LONG
+}
+
+class MyPrimitiveArrayUdaf extends AggregateFunction[Array[Long], Array[Long]] {
+
+ override def createAccumulator(): Array[Long] = new Array[Long](2)
+
+ override def getValue(accumulator: Array[Long]): Array[Long] = accumulator
+
+ def accumulate(accumulator: Array[Long], a: Int, b: Long): Unit = {
+ accumulator(0) += a
+ accumulator(1) += b
+ }
+
+ override def getAccumulatorType: TypeInformation[Array[Long]] =
+ PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+
+ override def getResultType: TypeInformation[Array[Long]] =
+ PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+}
+
+class MyObjectArrayUdaf extends AggregateFunction[Array[String], Array[String]] {
+
+ override def createAccumulator(): Array[String] = Array("", "")
+
+ override def getValue(accumulator: Array[String]): Array[String] = accumulator
+
+ def accumulate(accumulator: Array[String], c: String): Unit = {
+ accumulator(0) = accumulator(0) + c.charAt(0)
+ accumulator(1) = accumulator(1) + c.charAt(c.length - 1)
+ }
+
+ override def getAccumulatorType: TypeInformation[Array[String]] =
+ BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
+
+ override def getResultType: TypeInformation[Array[String]] =
+ BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
+}
+
+class MyNestedLongArrayUdaf extends AggregateFunction[Array[Array[Long]], Array[Array[Long]]] {
+
+ override def createAccumulator(): Array[Array[Long]] = Array(Array(0, 0), Array(0, 0))
+
+ override def getValue(accumulator: Array[Array[Long]]): Array[Array[Long]] = accumulator
+
+ def accumulate(accumulator: Array[Array[Long]], a: Int, b: Long): Unit = {
+ accumulator(0)(0) += a
+ accumulator(0)(1) += b
+ accumulator(1)(0) += b
+ accumulator(1)(1) += a
+ }
+
+ override def getAccumulatorType =
+ ObjectArrayTypeInfo.getInfoFor(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
+
+ override def getResultType = getAccumulatorType
+}
+
+class MyNestedStringArrayUdaf extends AggregateFunction[
+ Array[Array[String]], Array[Array[String]]] {
+
+ override def createAccumulator(): Array[Array[String]] = Array(Array("", ""), Array("", ""))
+
+ override def getValue(accumulator: Array[Array[String]]): Array[Array[String]] = accumulator
+
+ def accumulate(accumulator: Array[Array[String]], c: String): Unit = {
+ accumulator(0)(0) = accumulator(0)(0) + c.charAt(0)
+ accumulator(0)(1) = accumulator(0)(1) + c.charAt(c.length - 1)
+ accumulator(1)(0) = accumulator(1)(0) + c.charAt(c.length - 1)
+ accumulator(1)(1) = accumulator(1)(1) + c.charAt(0)
+ }
+
+ override def getAccumulatorType =
+ ObjectArrayTypeInfo.getInfoFor(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
+
+ override def getResultType =
+ getAccumulatorType
+}
+
+class MyPrimitiveMapUdaf extends AggregateFunction[
+ java.util.Map[Long, Int], java.util.Map[Long, Int]] {
+
+ override def createAccumulator(): java.util.Map[Long, Int] =
+ new java.util.HashMap[Long, Int]()
+
+ override def getValue(accumulator: java.util.Map[Long, Int]): java.util.Map[Long, Int] =
+ accumulator
+
+ def accumulate(accumulator: java.util.Map[Long, Int], a: Int, b: Long): Unit = {
+ accumulator.putIfAbsent(b, 0)
+ accumulator.put(b, accumulator.get(b) + a)
+ }
+
+ override def getAccumulatorType =
+ new MapTypeInfo(Types.LONG, Types.INT)
+ .asInstanceOf[TypeInformation[java.util.Map[Long, Int]]]
+
+ override def getResultType =
+ getAccumulatorType
+}
+
+class MyObjectMapUdaf extends AggregateFunction[
+ java.util.Map[String, Int], java.util.Map[String, Int]] {
+
+ override def createAccumulator(): java.util.Map[String, Int] =
+ new java.util.HashMap[String, Int]()
+
+ override def getValue(accumulator: java.util.Map[String, Int]): java.util.Map[String, Int] =
+ accumulator
+
+ def accumulate(accumulator: java.util.Map[String, Int], a: Int, c: String): Unit = {
+ val key = c.substring(0, 2)
+ accumulator.putIfAbsent(key, 0)
+ accumulator.put(key, accumulator.get(key) + a)
+ }
+
+ override def getAccumulatorType =
+ new MapTypeInfo(Types.STRING, Types.INT)
+ .asInstanceOf[TypeInformation[java.util.Map[String, Int]]]
+
+ override def getResultType = getAccumulatorType
+}
+
+class MyNestedMapUdf extends AggregateFunction[
+ java.util.Map[Long, java.util.Map[String, Int]],
+ java.util.Map[Long, java.util.Map[String, Int]]] {
+
+ override def createAccumulator(): java.util.Map[Long, java.util.Map[String, Int]] =
+ new java.util.HashMap[Long, java.util.Map[String, Int]]()
+
+ override def getValue(accumulator: java.util.Map[Long, java.util.Map[String, Int]])
+ : java.util.Map[Long, java.util.Map[String, Int]] =
+ accumulator
+
+ def accumulate(
+ accumulator: java.util.Map[Long, java.util.Map[String, Int]],
+ a: Int, b: Long, c: String): Unit = {
+ val key = c.substring(0, 2)
+ accumulator.putIfAbsent(b, new java.util.HashMap[String, Int]())
+ accumulator.get(b).putIfAbsent(key, 0)
+ accumulator.get(b).put(key, accumulator.get(b).get(key) + a)
+ }
+
+ override def getAccumulatorType =
+ new MapTypeInfo(Types.LONG, new MapTypeInfo(Types.STRING, Types.INT))
+ .asInstanceOf[TypeInformation[java.util.Map[Long, java.util.Map[String, Int]]]]
+
+ override def getResultType = getAccumulatorType
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTableEnvUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTableEnvUtil.scala
index fb1c2c6..da88b70 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTableEnvUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTableEnvUtil.scala
@@ -145,7 +145,7 @@ object BatchTableEnvUtil {
* @param boundedStream The [[DataStream]] to register as table in the catalog.
* @tparam T the type of the [[DataStream]].
*/
- protected def registerBoundedStreamInternal[T](
+ def registerBoundedStreamInternal[T](
tEnv: BatchTableEnvironment,
name: String,
boundedStream: DataStream[T],
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
index f97f5e6..88e63ea 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -22,17 +22,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaExecEnv}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment => ScalaExecEnv}
import org.apache.flink.table.api.internal.TableImpl
import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
import org.apache.flink.table.api.{SqlParserException, Table, TableConfig, TableConfigOptions, TableEnvironment}
-import org.apache.flink.table.dataformat.{BinaryRow, BinaryRowWriter}
+import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, BinaryRowWriter}
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.plan.stats.FlinkStatistic
import org.apache.flink.table.plan.util.FlinkRelOptUtil
import org.apache.flink.table.runtime.utils.BatchAbstractTestBase.DEFAULT_PARALLELISM
-import org.apache.flink.table.types.logical.LogicalType
+import org.apache.flink.table.types.logical.{BigIntType, LogicalType}
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
import org.apache.flink.table.util.{BaseRowTestUtil, DiffRepository, TableTestUtil}
import org.apache.flink.types.Row
@@ -431,6 +432,23 @@ class BatchTestBase extends BatchAbstractTestBase {
f: AggregateFunction[T, ACC]): Unit = {
tEnv.registerFunction(name, f)
}
+
+ def registerRange(name: String, end: Long): Unit = {
+ registerRange(name, 0, end)
+ }
+
+ def registerRange(name: String, start: Long, end: Long): Unit = {
+ BatchTableEnvUtil.registerBoundedStreamInternal(
+ tEnv, name, newRangeSource(start, end).javaStream,
+ Some[Array[String]](Array[String]("id")), None, None)
+ }
+
+ def newRangeSource(start: Long, end: Long): DataStream[BaseRow] = {
+ implicit val typeInfo: TypeInformation[BaseRow] = new BaseRowTypeInfo(new BigIntType)
+ val boundedStream = env.createInput(new RangeInputFormat(start, end))
+ boundedStream.setParallelism(1)
+ boundedStream
+ }
}
object BatchTestBase {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
index 9c66826..d45da77 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
@@ -20,7 +20,8 @@ package org.apache.flink.table.dataformat;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
import org.apache.flink.table.typeutils.BaseRowSerializer;
import org.apache.flink.table.util.SegmentsUtil;
@@ -97,13 +98,15 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
}
@Override
- public void writeArray(int pos, BinaryArray input) {
- writeSegmentsToVarLenPart(pos, input.getSegments(), input.getOffset(), input.getSizeInBytes());
+ public void writeArray(int pos, BaseArray input, BaseArraySerializer serializer) {
+ BinaryArray binary = serializer.toBinaryArray(input);
+ writeSegmentsToVarLenPart(pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}
@Override
- public void writeMap(int pos, BinaryMap input) {
- writeSegmentsToVarLenPart(pos, input.getSegments(), input.getOffset(), input.getSizeInBytes());
+ public void writeMap(int pos, BaseMap input, BaseMapSerializer serializer) {
+ BinaryMap binary = serializer.toBinaryMap(input);
+ writeSegmentsToVarLenPart(pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}
private DataOutputViewStreamWrapper getOutputView() {
@@ -135,12 +138,12 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
}
@Override
- public void writeRow(int pos, BaseRow input, RowType type) {
+ public void writeRow(int pos, BaseRow input, BaseRowSerializer serializer) {
if (input instanceof BinaryFormat) {
BinaryFormat row = (BinaryFormat) input;
writeSegmentsToVarLenPart(pos, row.getSegments(), row.getOffset(), row.getSizeInBytes());
} else {
- BinaryRow row = BaseRowSerializer.baseRowToBinary(input, type);
+ BinaryRow row = serializer.toBinaryRow(input);
writeSegmentsToVarLenPart(pos, row.getSegments(), row.getOffset(), row.getSizeInBytes());
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseArray.java
new file mode 100644
index 0000000..75a4c1c
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseArray.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * An interface for array used internally in Flink Table/SQL.
+ *
+ * <p>There are different implementations depending on the scenario:
+ * After serialization, it becomes the {@link BinaryArray} format.
+ * Convenient updates use the {@link GenericArray} format.
+ */
+public interface BaseArray extends TypeGetterSetters {
+
+ int numElements();
+
+ boolean isNullAt(int pos);
+
+ void setNullAt(int pos);
+
+ void setNotNullAt(int pos);
+
+ void setNullLong(int pos);
+
+ void setNullInt(int pos);
+
+ void setNullBoolean(int pos);
+
+ void setNullByte(int pos);
+
+ void setNullShort(int pos);
+
+ void setNullFloat(int pos);
+
+ void setNullDouble(int pos);
+
+ boolean[] toBooleanArray();
+
+ byte[] toByteArray();
+
+ short[] toShortArray();
+
+ int[] toIntArray();
+
+ long[] toLongArray();
+
+ float[] toFloatArray();
+
+ double[] toDoubleArray();
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseMap.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseMap.java
new file mode 100644
index 0000000..3ad542c
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseMap.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Map;
+
+/**
+ * An interface for map used internally in Flink Table/SQL.
+ *
+ * <p>There are different implementations depending on the scenario:
+ * After serialization, it becomes the {@link BinaryMap} format.
+ * Convenient updates use the {@link GenericMap} format.
+ */
+public interface BaseMap {
+
+ /**
+ * Invoke by codeGens.
+ */
+ int numElements();
+
+ /**
+ * This method will return a Java map containing INTERNAL type data.
+ * If you want a Java map containing external type data, you have to use converters.
+ */
+ Map toJavaMap(LogicalType keyType, LogicalType valueType);
+
+ // NOTE:
+ //
+ // As binary map has specific `get` and `toString` method,
+ // we do not provide these methods in the interface.
+ // Instead, we implement them in codegen.
+ //
+ // `get` is implemented in ScalarOperatorGens -> generateMapGet()
+ // `toString` is implemented in ScalarOperatorGens -> generateCast()
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
index fbf81b1..fbda19c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.util.SegmentsUtil;
+import java.lang.reflect.Array;
+
import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
/**
@@ -33,7 +35,7 @@ import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
*
* <p>{@code BinaryArray} are influenced by Apache Spark UnsafeArrayData.
*/
-public final class BinaryArray extends BinaryFormat implements TypeGetterSetters {
+public final class BinaryArray extends BinaryFormat implements BaseArray {
/**
* Offset for Arrays.
@@ -91,6 +93,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
return elementOffset + ordinal * elementSize;
}
+ @Override
public int numElements() {
return numElements;
}
@@ -120,6 +123,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
SegmentsUtil.bitSet(segments, offset + 4, pos);
}
+ @Override
public void setNotNullAt(int pos) {
assertIndexIsValid(pos);
SegmentsUtil.bitUnSet(segments, offset + 4, pos);
@@ -138,6 +142,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
SegmentsUtil.setLong(segments, getElementOffset(pos, 8), value);
}
+ @Override
public void setNullLong(int pos) {
assertIndexIsValid(pos);
SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -157,6 +162,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
SegmentsUtil.setInt(segments, getElementOffset(pos, 4), value);
}
+ @Override
public void setNullInt(int pos) {
assertIndexIsValid(pos);
SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -204,13 +210,13 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
}
@Override
- public BinaryArray getArray(int pos) {
+ public BaseArray getArray(int pos) {
assertIndexIsValid(pos);
return BinaryArray.readBinaryArrayFieldFromSegments(segments, offset, getLong(pos));
}
@Override
- public BinaryMap getMap(int pos) {
+ public BaseMap getMap(int pos) {
assertIndexIsValid(pos);
return BinaryMap.readBinaryMapFieldFromSegments(segments, offset, getLong(pos));
}
@@ -237,6 +243,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
SegmentsUtil.setBoolean(segments, getElementOffset(pos, 1), value);
}
+ @Override
public void setNullBoolean(int pos) {
assertIndexIsValid(pos);
SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -256,6 +263,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
SegmentsUtil.setByte(segments, getElementOffset(pos, 1), value);
}
+ @Override
public void setNullByte(int pos) {
assertIndexIsValid(pos);
SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -275,6 +283,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
SegmentsUtil.setShort(segments, getElementOffset(pos, 2), value);
}
+ @Override
public void setNullShort(int pos) {
assertIndexIsValid(pos);
SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -294,6 +303,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
SegmentsUtil.setFloat(segments, getElementOffset(pos, 4), value);
}
+ @Override
public void setNullFloat(int pos) {
assertIndexIsValid(pos);
SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -313,6 +323,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
SegmentsUtil.setDouble(segments, getElementOffset(pos, 8), value);
}
+ @Override
public void setNullDouble(int pos) {
assertIndexIsValid(pos);
SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -365,6 +376,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
}
}
+ @Override
public boolean[] toBooleanArray() {
checkNoNull();
boolean[] values = new boolean[numElements];
@@ -373,6 +385,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
return values;
}
+ @Override
public byte[] toByteArray() {
checkNoNull();
byte[] values = new byte[numElements];
@@ -381,6 +394,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
return values;
}
+ @Override
public short[] toShortArray() {
checkNoNull();
short[] values = new short[numElements];
@@ -389,6 +403,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
return values;
}
+ @Override
public int[] toIntArray() {
checkNoNull();
int[] values = new int[numElements];
@@ -397,6 +412,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
return values;
}
+ @Override
public long[] toLongArray() {
checkNoNull();
long[] values = new long[numElements];
@@ -405,6 +421,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
return values;
}
+ @Override
public float[] toFloatArray() {
checkNoNull();
float[] values = new float[numElements];
@@ -413,6 +430,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
return values;
}
+ @Override
public double[] toDoubleArray() {
checkNoNull();
double[] values = new double[numElements];
@@ -421,6 +439,17 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
return values;
}
+ public <T> T[] toClassArray(LogicalType elementType, Class<T> elementClass) {
+ int size = numElements();
+ T[] values = (T[]) Array.newInstance(elementClass, size);
+ for (int i = 0; i < size; i++) {
+ if (!isNullAt(i)) {
+ values[i] = (T) TypeGetterSetters.get(this, i, elementType);
+ }
+ }
+ return values;
+ }
+
public BinaryArray copy() {
return copy(new BinaryArray());
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
index 590a422..ee0529a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
@@ -208,4 +208,8 @@ public final class BinaryArrayWriter extends AbstractBinaryWriter {
public void complete() {
array.pointTo(segment, 0, cursor);
}
+
+ public int getNumElements() {
+ return numElements;
+ }
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
index 272fd27..8da32db 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
@@ -20,8 +20,13 @@ package org.apache.flink.table.dataformat;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.util.SegmentsUtil;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.types.ClassLogicalTypeConverter.getInternalClassForType;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
@@ -29,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
*
* <p>{@code BinaryMap} are influenced by Apache Spark UnsafeMapData.
*/
-public final class BinaryMap extends BinaryFormat {
+public final class BinaryMap extends BinaryFormat implements BaseMap {
private final BinaryArray keys;
private final BinaryArray values;
@@ -69,6 +74,18 @@ public final class BinaryMap extends BinaryFormat {
return values;
}
+ @Override
+ public Map<Object, Object> toJavaMap(LogicalType keyType, LogicalType valueType) {
+ Object[] keyArray = keys.toClassArray(keyType, getInternalClassForType(keyType));
+ Object[] valueArray = values.toClassArray(valueType, getInternalClassForType(valueType));
+
+ Map<Object, Object> map = new HashMap<>();
+ for (int i = 0; i < keyArray.length; i++) {
+ map.put(keyArray[i], valueArray[i]);
+ }
+ return map;
+ }
+
public BinaryMap copy() {
return copy(new BinaryMap());
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
index 358b1fc..5d67d2f 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
@@ -318,13 +318,13 @@ public final class BinaryRow extends BinaryFormat implements BaseRow {
}
@Override
- public BinaryArray getArray(int pos) {
+ public BaseArray getArray(int pos) {
assertIndexIsValid(pos);
return BinaryArray.readBinaryArrayFieldFromSegments(segments, offset, getLong(pos));
}
@Override
- public BinaryMap getMap(int pos) {
+ public BaseMap getMap(int pos) {
assertIndexIsValid(pos);
return BinaryMap.readBinaryMapFieldFromSegments(segments, offset, getLong(pos));
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
index 16fef15..ea97673 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
@@ -17,9 +17,12 @@
package org.apache.flink.table.dataformat;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
/**
* Writer to write a composite data format, like row, array.
@@ -59,11 +62,11 @@ public interface BinaryWriter {
void writeDecimal(int pos, Decimal value, int precision);
- void writeArray(int pos, BinaryArray value);
+ void writeArray(int pos, BaseArray value, BaseArraySerializer serializer);
- void writeMap(int pos, BinaryMap value);
+ void writeMap(int pos, BaseMap value, BaseMapSerializer serializer);
- void writeRow(int pos, BaseRow value, RowType type);
+ void writeRow(int pos, BaseRow value, BaseRowSerializer type);
void writeGeneric(int pos, BinaryGeneric value);
@@ -72,7 +75,8 @@ public interface BinaryWriter {
*/
void complete();
- static void write(BinaryWriter writer, int pos, Object o, LogicalType type) {
+ static void write(BinaryWriter writer, int pos,
+ Object o, LogicalType type, TypeSerializer serializer) {
switch (type.getTypeRoot()) {
case BOOLEAN:
writer.writeBoolean(pos, (boolean) o);
@@ -109,15 +113,14 @@ public interface BinaryWriter {
writer.writeDecimal(pos, (Decimal) o, decimalType.getPrecision());
break;
case ARRAY:
- writer.writeArray(pos, (BinaryArray) o);
+ writer.writeArray(pos, (BaseArray) o, (BaseArraySerializer) serializer);
break;
case MAP:
case MULTISET:
- writer.writeMap(pos, (BinaryMap) o);
+ writer.writeMap(pos, (BaseMap) o, (BaseMapSerializer) serializer);
break;
case ROW:
- RowType rowType = (RowType) type;
- writer.writeRow(pos, (BaseRow) o, rowType);
+ writer.writeRow(pos, (BaseRow) o, (BaseRowSerializer) serializer);
break;
case ANY:
writer.writeGeneric(pos, (BinaryGeneric) o);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
index 8764c98..3fed637 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
@@ -140,13 +140,13 @@ public final class ColumnarRow implements BaseRow {
}
@Override
- public BinaryArray getArray(int ordinal) {
+ public BaseArray getArray(int ordinal) {
// TODO
throw new UnsupportedOperationException("Array is not supported.");
}
@Override
- public BinaryMap getMap(int ordinal) {
+ public BaseMap getMap(int ordinal) {
// TODO
throw new UnsupportedOperationException("Map is not supported.");
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index 35f9b70..af32a51 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.InternalSerializers;
import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.types.LogicalTypeDataTypeConverter;
import org.apache.flink.table.types.logical.DecimalType;
@@ -44,6 +45,8 @@ import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
import org.apache.flink.table.typeutils.DecimalTypeInfo;
import org.apache.flink.types.Row;
+import org.apache.commons.lang3.ArrayUtils;
+
import java.io.Serializable;
import java.lang.reflect.Array;
import java.math.BigDecimal;
@@ -233,7 +236,7 @@ public class DataFormatConverters {
}
return new GenericConverter(typeInfo.createSerializer(new ExecutionConfig()));
default:
- throw new RuntimeException("Not support dataType: " + originDataType);
+ throw new RuntimeException("Not support dataType: " + dataType);
}
}
@@ -466,7 +469,7 @@ public class DataFormatConverters {
/**
* Converter for BinaryArray.
*/
- public static final class BinaryArrayConverter extends IdentityConverter<BinaryArray> {
+ public static final class BinaryArrayConverter extends IdentityConverter<BaseArray> {
private static final long serialVersionUID = -7790350668043604641L;
@@ -475,7 +478,7 @@ public class DataFormatConverters {
private BinaryArrayConverter() {}
@Override
- BinaryArray toExternalImpl(BaseRow row, int column) {
+ BaseArray toExternalImpl(BaseRow row, int column) {
return row.getArray(column);
}
}
@@ -483,7 +486,7 @@ public class DataFormatConverters {
/**
* Converter for BinaryMap.
*/
- public static final class BinaryMapConverter extends IdentityConverter<BinaryMap> {
+ public static final class BinaryMapConverter extends IdentityConverter<BaseMap> {
private static final long serialVersionUID = -9114231688474126815L;
@@ -492,7 +495,7 @@ public class DataFormatConverters {
private BinaryMapConverter() {}
@Override
- BinaryMap toExternalImpl(BaseRow row, int column) {
+ BaseMap toExternalImpl(BaseRow row, int column) {
return row.getMap(column);
}
}
@@ -706,7 +709,7 @@ public class DataFormatConverters {
/**
* Converter for primitive int array.
*/
- public static final class PrimitiveIntArrayConverter extends DataFormatConverter<BinaryArray, int[]> {
+ public static final class PrimitiveIntArrayConverter extends DataFormatConverter<BaseArray, int[]> {
private static final long serialVersionUID = 1780941126232395638L;
@@ -715,12 +718,12 @@ public class DataFormatConverters {
private PrimitiveIntArrayConverter() {}
@Override
- BinaryArray toInternalImpl(int[] value) {
- return BinaryArray.fromPrimitiveArray(value);
+ BaseArray toInternalImpl(int[] value) {
+ return new GenericArray(value, value.length, true);
}
@Override
- int[] toExternalImpl(BinaryArray value) {
+ int[] toExternalImpl(BaseArray value) {
return value.toIntArray();
}
@@ -733,7 +736,7 @@ public class DataFormatConverters {
/**
* Converter for primitive boolean array.
*/
- public static final class PrimitiveBooleanArrayConverter extends DataFormatConverter<BinaryArray, boolean[]> {
+ public static final class PrimitiveBooleanArrayConverter extends DataFormatConverter<BaseArray, boolean[]> {
private static final long serialVersionUID = -4037693692440282141L;
@@ -742,12 +745,12 @@ public class DataFormatConverters {
private PrimitiveBooleanArrayConverter() {}
@Override
- BinaryArray toInternalImpl(boolean[] value) {
- return BinaryArray.fromPrimitiveArray(value);
+ BaseArray toInternalImpl(boolean[] value) {
+ return new GenericArray(value, value.length, true);
}
@Override
- boolean[] toExternalImpl(BinaryArray value) {
+ boolean[] toExternalImpl(BaseArray value) {
return value.toBooleanArray();
}
@@ -777,7 +780,7 @@ public class DataFormatConverters {
/**
* Converter for primitive short array.
*/
- public static final class PrimitiveShortArrayConverter extends DataFormatConverter<BinaryArray, short[]> {
+ public static final class PrimitiveShortArrayConverter extends DataFormatConverter<BaseArray, short[]> {
private static final long serialVersionUID = -1343184089311186834L;
@@ -786,12 +789,12 @@ public class DataFormatConverters {
private PrimitiveShortArrayConverter() {}
@Override
- BinaryArray toInternalImpl(short[] value) {
- return BinaryArray.fromPrimitiveArray(value);
+ BaseArray toInternalImpl(short[] value) {
+ return new GenericArray(value, value.length, true);
}
@Override
- short[] toExternalImpl(BinaryArray value) {
+ short[] toExternalImpl(BaseArray value) {
return value.toShortArray();
}
@@ -804,7 +807,7 @@ public class DataFormatConverters {
/**
* Converter for primitive long array.
*/
- public static final class PrimitiveLongArrayConverter extends DataFormatConverter<BinaryArray, long[]> {
+ public static final class PrimitiveLongArrayConverter extends DataFormatConverter<BaseArray, long[]> {
private static final long serialVersionUID = 4061982985342526078L;
@@ -813,12 +816,12 @@ public class DataFormatConverters {
private PrimitiveLongArrayConverter() {}
@Override
- BinaryArray toInternalImpl(long[] value) {
- return BinaryArray.fromPrimitiveArray(value);
+ BaseArray toInternalImpl(long[] value) {
+ return new GenericArray(value, value.length, true);
}
@Override
- long[] toExternalImpl(BinaryArray value) {
+ long[] toExternalImpl(BaseArray value) {
return value.toLongArray();
}
@@ -831,7 +834,7 @@ public class DataFormatConverters {
/**
* Converter for primitive float array.
*/
- public static final class PrimitiveFloatArrayConverter extends DataFormatConverter<BinaryArray, float[]> {
+ public static final class PrimitiveFloatArrayConverter extends DataFormatConverter<BaseArray, float[]> {
private static final long serialVersionUID = -3237695040861141459L;
@@ -840,12 +843,12 @@ public class DataFormatConverters {
private PrimitiveFloatArrayConverter() {}
@Override
- BinaryArray toInternalImpl(float[] value) {
- return BinaryArray.fromPrimitiveArray(value);
+ BaseArray toInternalImpl(float[] value) {
+ return new GenericArray(value, value.length, true);
}
@Override
- float[] toExternalImpl(BinaryArray value) {
+ float[] toExternalImpl(BaseArray value) {
return value.toFloatArray();
}
@@ -858,7 +861,7 @@ public class DataFormatConverters {
/**
* Converter for primitive double array.
*/
- public static final class PrimitiveDoubleArrayConverter extends DataFormatConverter<BinaryArray, double[]> {
+ public static final class PrimitiveDoubleArrayConverter extends DataFormatConverter<BaseArray, double[]> {
private static final long serialVersionUID = 6333670535356315691L;
@@ -867,12 +870,12 @@ public class DataFormatConverters {
private PrimitiveDoubleArrayConverter() {}
@Override
- BinaryArray toInternalImpl(double[] value) {
- return BinaryArray.fromPrimitiveArray(value);
+ BaseArray toInternalImpl(double[] value) {
+ return new GenericArray(value, value.length, true);
}
@Override
- double[] toExternalImpl(BinaryArray value) {
+ double[] toExternalImpl(BaseArray value) {
return value.toDoubleArray();
}
@@ -885,7 +888,7 @@ public class DataFormatConverters {
/**
* Converter for object array.
*/
- public static final class ObjectArrayConverter<T> extends DataFormatConverter<BinaryArray, T[]> {
+ public static final class ObjectArrayConverter<T> extends DataFormatConverter<BaseArray, T[]> {
private static final long serialVersionUID = -7434682160639380078L;
@@ -893,33 +896,52 @@ public class DataFormatConverters {
private final LogicalType elementType;
private final DataFormatConverter<Object, T> elementConverter;
private final int elementSize;
+ private final TypeSerializer<T> eleSer;
+ private final boolean isEleIndentity;
+
+ private transient BinaryArray reuseArray;
+ private transient BinaryArrayWriter reuseWriter;
public ObjectArrayConverter(DataType elementType) {
this.componentClass = (Class) elementType.getConversionClass();
this.elementType = LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(elementType);
this.elementConverter = DataFormatConverters.getConverterForDataType(elementType);
- this.elementSize = BinaryArray.calculateFixLengthPartSize(elementType.getLogicalType());
+ this.elementSize = BinaryArray.calculateFixLengthPartSize(this.elementType);
+ this.eleSer = InternalSerializers.create(this.elementType, new ExecutionConfig());
+ this.isEleIndentity = elementConverter instanceof IdentityConverter;
}
@Override
- BinaryArray toInternalImpl(T[] value) {
- BinaryArray array = new BinaryArray();
- BinaryArrayWriter writer = new BinaryArrayWriter(array, value.length, elementSize);
+ BaseArray toInternalImpl(T[] value) {
+ return isEleIndentity ? new GenericArray(value, value.length) : toBinaryArray(value);
+ }
+
+ private BaseArray toBinaryArray(T[] value) {
+ if (reuseArray == null) {
+ reuseArray = new BinaryArray();
+ }
+ if (reuseWriter == null || reuseWriter.getNumElements() != value.length) {
+ reuseWriter = new BinaryArrayWriter(reuseArray, value.length, elementSize);
+ } else {
+ reuseWriter.reset();
+ }
for (int i = 0; i < value.length; i++) {
Object field = value[i];
if (field == null) {
- writer.setNullAt(i, elementType);
+ reuseWriter.setNullAt(i, elementType);
} else {
- BinaryWriter.write(writer, i, elementConverter.toInternalImpl(value[i]), elementType);
+ BinaryWriter.write(reuseWriter, i, elementConverter.toInternalImpl(value[i]), elementType, eleSer);
}
}
- writer.complete();
- return array;
+ reuseWriter.complete();
+ return reuseArray;
}
@Override
- T[] toExternalImpl(BinaryArray value) {
- return binaryArrayToJavaArray(value, elementType, componentClass, elementConverter);
+ T[] toExternalImpl(BaseArray value) {
+ return (isEleIndentity && value instanceof GenericArray) ?
+ genericArrayToJavaArray((GenericArray) value, elementType) :
+ binaryArrayToJavaArray((BinaryArray) value, elementType, componentClass, elementConverter);
}
@Override
@@ -928,6 +950,32 @@ public class DataFormatConverters {
}
}
+ private static <T> T[] genericArrayToJavaArray(GenericArray value, LogicalType eleType) {
+ Object array = value.getArray();
+ if (value.isPrimitiveArray()) {
+ switch (eleType.getTypeRoot()) {
+ case BOOLEAN:
+ return (T[]) ArrayUtils.toObject((boolean[]) array);
+ case TINYINT:
+ return (T[]) ArrayUtils.toObject((byte[]) array);
+ case SMALLINT:
+ return (T[]) ArrayUtils.toObject((short[]) array);
+ case INTEGER:
+ return (T[]) ArrayUtils.toObject((int[]) array);
+ case BIGINT:
+ return (T[]) ArrayUtils.toObject((long[]) array);
+ case FLOAT:
+ return (T[]) ArrayUtils.toObject((float[]) array);
+ case DOUBLE:
+ return (T[]) ArrayUtils.toObject((double[]) array);
+ default:
+ throw new RuntimeException("Not a primitive type: " + eleType);
+ }
+ } else {
+ return (T[]) array;
+ }
+ }
+
private static <T> T[] binaryArrayToJavaArray(BinaryArray value, LogicalType elementType,
Class<T> componentClass, DataFormatConverter<Object, T> elementConverter) {
int size = value.numElements();
@@ -946,7 +994,7 @@ public class DataFormatConverters {
/**
* Converter for map.
*/
- public static final class MapConverter extends DataFormatConverter<BinaryMap, Map> {
+ public static final class MapConverter extends DataFormatConverter<BaseMap, Map> {
private static final long serialVersionUID = -916429669828309919L;
@@ -962,6 +1010,16 @@ public class DataFormatConverters {
private final Class keyComponentClass;
private final Class valueComponentClass;
+ private final TypeSerializer keySer;
+ private final TypeSerializer valueSer;
+
+ private final boolean isKeyValueIndentity;
+
+ private transient BinaryArray reuseKArray;
+ private transient BinaryArrayWriter reuseKWriter;
+ private transient BinaryArray reuseVArray;
+ private transient BinaryArrayWriter reuseVWriter;
+
public MapConverter(DataType keyTypeInfo, DataType valueTypeInfo) {
this.keyType = LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(keyTypeInfo);
this.valueType = LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(valueTypeInfo);
@@ -971,38 +1029,58 @@ public class DataFormatConverters {
this.valueElementSize = BinaryArray.calculateFixLengthPartSize(valueType);
this.keyComponentClass = keyTypeInfo.getConversionClass();
this.valueComponentClass = valueTypeInfo.getConversionClass();
+ this.isKeyValueIndentity = keyConverter instanceof IdentityConverter &&
+ valueConverter instanceof IdentityConverter;
+ this.keySer = InternalSerializers.create(this.keyType, new ExecutionConfig());
+ this.valueSer = InternalSerializers.create(this.valueType, new ExecutionConfig());
}
@Override
- BinaryMap toInternalImpl(Map value) {
- BinaryArray keyArray = new BinaryArray();
- BinaryArrayWriter keyWriter = new BinaryArrayWriter(keyArray, value.size(), keyElementSize);
+ BaseMap toInternalImpl(Map value) {
+ return isKeyValueIndentity ? new GenericMap(value) : toBinaryMap(value);
+ }
- BinaryArray valueArray = new BinaryArray();
- BinaryArrayWriter valueWriter = new BinaryArrayWriter(valueArray, value.size(), valueElementSize);
+ private BinaryMap toBinaryMap(Map value) {
+ if (reuseKArray == null) {
+ reuseKArray = new BinaryArray();
+ reuseVArray = new BinaryArray();
+ }
+ if (reuseKWriter == null || reuseKWriter.getNumElements() != value.size()) {
+ reuseKWriter = new BinaryArrayWriter(reuseKArray, value.size(), keyElementSize);
+ reuseVWriter = new BinaryArrayWriter(reuseVArray, value.size(), valueElementSize);
+ } else {
+ reuseKWriter.reset();
+ reuseVWriter.reset();
+ }
int i = 0;
for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) value).entrySet()) {
if (entry.getKey() == null) {
- keyWriter.setNullAt(i, keyType);
+ reuseKWriter.setNullAt(i, keyType);
} else {
- BinaryWriter.write(keyWriter, i, keyConverter.toInternalImpl(entry.getKey()), keyType);
+ BinaryWriter.write(reuseKWriter, i, keyConverter.toInternalImpl(entry.getKey()), keyType, keySer);
}
if (entry.getValue() == null) {
- valueWriter.setNullAt(i, valueType);
+ reuseVWriter.setNullAt(i, valueType);
} else {
- BinaryWriter.write(valueWriter, i, valueConverter.toInternalImpl(entry.getValue()), valueType);
+ BinaryWriter.write(reuseVWriter, i, valueConverter.toInternalImpl(entry.getValue()), valueType, valueSer);
}
i++;
}
- keyWriter.complete();
- valueWriter.complete();
- return BinaryMap.valueOf(keyArray, valueArray);
+ reuseKWriter.complete();
+ reuseVWriter.complete();
+ return BinaryMap.valueOf(reuseKArray, reuseVArray);
}
@Override
- Map toExternalImpl(BinaryMap value) {
+ Map toExternalImpl(BaseMap value) {
+ return (isKeyValueIndentity && value instanceof GenericMap) ?
+ ((GenericMap) value).getMap() :
+ binaryMapToMap((BinaryMap) value);
+ }
+
+ private Map binaryMapToMap(BinaryMap value) {
Map<Object, Object> map = new HashMap<>();
Object[] keys = binaryArrayToJavaArray(value.keyArray(), keyType, keyComponentClass, keyConverter);
Object[] values = binaryArrayToJavaArray(value.valueArray(), valueType, valueComponentClass, valueConverter);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericArray.java
new file mode 100644
index 0000000..94077be
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericArray.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A GenericArray is an array where all the elements have the same type.
+ * It can be considered as a wrapper class of the normal java array.
+ */
+public class GenericArray implements BaseArray {
+
+ private final Object arr;
+ private final int numElements;
+ private final boolean isPrimitiveArray;
+
+ public GenericArray(Object arr, int numElements) {
+ this(arr, numElements, isPrimitiveArray(arr));
+ }
+
+ public GenericArray(Object arr, int numElements, boolean isPrimitiveArray) {
+ this.arr = arr;
+ this.numElements = numElements;
+ this.isPrimitiveArray = isPrimitiveArray;
+ }
+
+ private static boolean isPrimitiveArray(Object arr) {
+ checkNotNull(arr);
+ checkArgument(arr.getClass().isArray());
+ return arr.getClass().getComponentType().isPrimitive();
+ }
+
+ public boolean isPrimitiveArray() {
+ return isPrimitiveArray;
+ }
+
+ public Object getArray() {
+ return arr;
+ }
+
+ @Override
+ public int numElements() {
+ return numElements;
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return !isPrimitiveArray && ((Object[]) arr)[pos] == null;
+ }
+
+ @Override
+ public void setNullAt(int pos) {
+ checkState(!isPrimitiveArray, "Can't set null for primitive array");
+ ((Object[]) arr)[pos] = null;
+ }
+
+ @Override
+ public void setNotNullAt(int pos) {
+ // do nothing, as an update will follow immediately
+ }
+
+ @Override
+ public void setNullLong(int pos) {
+ setNullAt(pos);
+ }
+
+ @Override
+ public void setNullInt(int pos) {
+ setNullAt(pos);
+ }
+
+ @Override
+ public void setNullBoolean(int pos) {
+ setNullAt(pos);
+ }
+
+ @Override
+ public void setNullByte(int pos) {
+ setNullAt(pos);
+ }
+
+ @Override
+ public void setNullShort(int pos) {
+ setNullAt(pos);
+ }
+
+ @Override
+ public void setNullFloat(int pos) {
+ setNullAt(pos);
+ }
+
+ @Override
+ public void setNullDouble(int pos) {
+ setNullAt(pos);
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ return (boolean[]) arr;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ return (byte[]) arr;
+ }
+
+ @Override
+ public short[] toShortArray() {
+ return (short[]) arr;
+ }
+
+ @Override
+ public int[] toIntArray() {
+ return (int[]) arr;
+ }
+
+ @Override
+ public long[] toLongArray() {
+ return (long[]) arr;
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ return (float[]) arr;
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ return (double[]) arr;
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return isPrimitiveArray ? ((boolean[]) arr)[pos] : (boolean) getObject(pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return isPrimitiveArray ? ((byte[]) arr)[pos] : (byte) getObject(pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return isPrimitiveArray ? ((short[]) arr)[pos] : (short) getObject(pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return isPrimitiveArray ? ((int[]) arr)[pos] : (int) getObject(pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return isPrimitiveArray ? ((long[]) arr)[pos] : (long) getObject(pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return isPrimitiveArray ? ((float[]) arr)[pos] : (float) getObject(pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return isPrimitiveArray ? ((double[]) arr)[pos] : (double) getObject(pos);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return (byte[]) getObject(pos);
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return (BinaryString) getObject(pos);
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ return (Decimal) getObject(pos);
+ }
+
+ @Override
+ public <T> BinaryGeneric<T> getGeneric(int pos) {
+ return (BinaryGeneric) getObject(pos);
+ }
+
+ @Override
+ public BaseRow getRow(int pos, int numFields) {
+ return (BaseRow) getObject(pos);
+ }
+
+ @Override
+ public BaseArray getArray(int pos) {
+ return (BaseArray) getObject(pos);
+ }
+
+ @Override
+ public BaseMap getMap(int pos) {
+ return (BaseMap) getObject(pos);
+ }
+
+ @Override
+ public void setBoolean(int pos, boolean value) {
+ if (isPrimitiveArray) {
+ ((boolean[]) arr)[pos] = value;
+ } else {
+ setObject(pos, value);
+ }
+ }
+
+ @Override
+ public void setByte(int pos, byte value) {
+ if (isPrimitiveArray) {
+ ((byte[]) arr)[pos] = value;
+ } else {
+ setObject(pos, value);
+ }
+ }
+
+ @Override
+ public void setShort(int pos, short value) {
+ if (isPrimitiveArray) {
+ ((short[]) arr)[pos] = value;
+ } else {
+ setObject(pos, value);
+ }
+ }
+
+ @Override
+ public void setInt(int pos, int value) {
+ if (isPrimitiveArray) {
+ ((int[]) arr)[pos] = value;
+ } else {
+ setObject(pos, value);
+ }
+ }
+
+ @Override
+ public void setLong(int pos, long value) {
+ if (isPrimitiveArray) {
+ ((long[]) arr)[pos] = value;
+ } else {
+ setObject(pos, value);
+ }
+ }
+
+ @Override
+ public void setFloat(int pos, float value) {
+ if (isPrimitiveArray) {
+ ((float[]) arr)[pos] = value;
+ } else {
+ setObject(pos, value);
+ }
+ }
+
+ @Override
+ public void setDouble(int pos, double value) {
+ if (isPrimitiveArray) {
+ ((double[]) arr)[pos] = value;
+ } else {
+ setObject(pos, value);
+ }
+ }
+
+ @Override
+ public void setDecimal(int pos, Decimal value, int precision) {
+ setObject(pos, value);
+ }
+
+ public Object getObject(int pos) {
+ return ((Object[]) arr)[pos];
+ }
+
+ public void setObject(int pos, Object value) {
+ ((Object[]) arr)[pos] = value;
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericMap.java
similarity index 53%
copy from flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java
copy to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericMap.java
index 8fdb7b0..fb0217b 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericMap.java
@@ -16,38 +16,44 @@
* limitations under the License.
*/
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataformat;
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Map;
/**
- * A test for the {@link BinaryArraySerializer}.
+ * A GenericMap is a map where all the keys have the same type, and all the values have the same type.
+ * It can be considered as a wrapper class of the normal java map.
*/
-public class DecimalSerializerTest extends SerializerTestBase<Decimal> {
+public class GenericMap implements BaseMap {
- @Override
- protected DecimalSerializer createSerializer() {
- return new DecimalSerializer(5, 2);
+ private final Map<Object, Object> map;
+
+ public GenericMap(Map<Object, Object> map) {
+ this.map = map;
}
@Override
- protected int getLength() {
- return -1;
+ public int numElements() {
+ return map.size();
}
@Override
- protected Class<Decimal> getTypeClass() {
- return Decimal.class;
+ public Map toJavaMap(LogicalType keyType, LogicalType valueType) {
+ return map;
+ }
+
+ public Object get(Object key) {
+ return map.get(key);
}
@Override
- protected Decimal[] getTestData() {
- return new Decimal[] {
- Decimal.fromLong(1, 5, 2),
- Decimal.fromLong(2, 5, 2),
- Decimal.fromLong(3, 5, 2),
- Decimal.fromLong(4, 5, 2)
- };
+ public String toString() {
+ return map.toString();
+ }
+
+ public Map<Object, Object> getMap() {
+ return map;
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
index a92ca48..92e29a5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
@@ -172,7 +172,7 @@ public final class JoinedRow implements BaseRow {
}
@Override
- public BinaryArray getArray(int i) {
+ public BaseArray getArray(int i) {
if (i < row1.getArity()) {
return row1.getArray(i);
} else {
@@ -181,7 +181,7 @@ public final class JoinedRow implements BaseRow {
}
@Override
- public BinaryMap getMap(int i) {
+ public BaseMap getMap(int i) {
if (i < row1.getArity()) {
return row1.getMap(i);
} else {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
index c211e52..1ba5592 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
@@ -258,13 +258,13 @@ public final class NestedRow extends BinaryFormat implements BaseRow {
}
@Override
- public BinaryArray getArray(int pos) {
+ public BaseArray getArray(int pos) {
assertIndexIsValid(pos);
return BinaryArray.readBinaryArrayFieldFromSegments(segments, offset, getLong(pos));
}
@Override
- public BinaryMap getMap(int pos) {
+ public BaseMap getMap(int pos) {
assertIndexIsValid(pos);
return BinaryMap.readBinaryMapFieldFromSegments(segments, offset, getLong(pos));
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
index a2be7b2..b6f4799 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
@@ -76,13 +76,13 @@ public abstract class ObjectArrayRow implements BaseRow {
}
@Override
- public BinaryArray getArray(int ordinal) {
- return (BinaryArray) this.fields[ordinal];
+ public BaseArray getArray(int ordinal) {
+ return (BaseArray) this.fields[ordinal];
}
@Override
- public BinaryMap getMap(int ordinal) {
- return (BinaryMap) this.fields[ordinal];
+ public BaseMap getMap(int ordinal) {
+ return (BaseMap) this.fields[ordinal];
}
@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
index 92217ed..29ed39c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
@@ -101,14 +101,14 @@ public interface TypeGetterSetters {
byte[] getBinary(int ordinal);
/**
- * Get array value, internal format is BinaryArray.
+ * Get array value, internal format is BaseArray.
*/
- BinaryArray getArray(int ordinal);
+ BaseArray getArray(int ordinal);
/**
- * Get map value, internal format is BinaryMap.
+ * Get map value, internal format is BaseMap.
*/
- BinaryMap getMap(int ordinal);
+ BaseMap getMap(int ordinal);
/**
* Get row value, internal format is BaseRow.
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java
index d385181..e67aa38 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java
@@ -117,12 +117,12 @@ public final class UpdatableRow implements BaseRow {
}
@Override
- public BinaryArray getArray(int ordinal) {
- return updated[ordinal] ? (BinaryArray) fields[ordinal] : row.getArray(ordinal);
+ public BaseArray getArray(int ordinal) {
+ return updated[ordinal] ? (BaseArray) fields[ordinal] : row.getArray(ordinal);
}
@Override
- public BinaryMap getMap(int ordinal) {
+ public BaseMap getMap(int ordinal) {
return updated[ordinal] ? (BinaryMap) fields[ordinal] : row.getMap(ordinal);
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
index 9797091..eb96bdf 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
@@ -243,7 +243,7 @@ public class BinaryHashTable extends BaseHybridHashTable {
public void putBuildRow(BaseRow row) throws IOException {
final int hashCode = hash(this.buildSideProjection.apply(row).hashCode(), 0);
// TODO: combine key projection and build side conversion to code gen.
- insertIntoTable(originBuildSideSerializer.baseRowToBinary(row), hashCode);
+ insertIntoTable(originBuildSideSerializer.toBinaryRow(row), hashCode);
}
/**
@@ -290,7 +290,7 @@ public class BinaryHashTable extends BaseHybridHashTable {
return true;
} else {
if (p.testHashBloomFilter(hash)) {
- BinaryRow row = originProbeSideSerializer.baseRowToBinary(record);
+ BinaryRow row = originProbeSideSerializer.toBinaryRow(record);
p.insertIntoProbeBuffer(row);
}
return false;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/StreamSortOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/StreamSortOperator.java
index c091985..3ce0d0f 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/StreamSortOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/StreamSortOperator.java
@@ -96,7 +96,7 @@ public class StreamSortOperator extends TableStreamOperator<BaseRow> implements
@Override
public void processElement(StreamRecord<BaseRow> element) throws Exception {
BaseRow originalInput = element.getValue();
- BinaryRow input = baseRowSerializer.baseRowToBinary(originalInput).copy();
+ BinaryRow input = baseRowSerializer.toBinaryRow(originalInput).copy();
BaseRowUtil.setAccumulate(input);
long count = inputBuffer.getOrDefault(input, 0L);
if (BaseRowUtil.isAccumulateMsg(originalInput)) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
index 9050435..a34d434 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
@@ -19,10 +19,10 @@
package org.apache.flink.table.types;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.dataformat.BaseArray;
+import org.apache.flink.table.dataformat.BaseMap;
import org.apache.flink.table.dataformat.BaseRow;
-import org.apache.flink.table.dataformat.BinaryArray;
import org.apache.flink.table.dataformat.BinaryGeneric;
-import org.apache.flink.table.dataformat.BinaryMap;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.dataformat.Decimal;
import org.apache.flink.table.types.logical.ArrayType;
@@ -160,10 +160,10 @@ public class ClassLogicalTypeConverter {
case DECIMAL:
return Decimal.class;
case ARRAY:
- return BinaryArray.class;
+ return BaseArray.class;
case MAP:
case MULTISET:
- return BinaryMap.class;
+ return BaseMap.class;
case ROW:
return BaseRow.class;
case BINARY:
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
index c871038..9f2ef5b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
@@ -28,14 +28,18 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.ShortSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
import org.apache.flink.table.typeutils.BaseRowSerializer;
-import org.apache.flink.table.typeutils.BinaryArraySerializer;
import org.apache.flink.table.typeutils.BinaryGenericSerializer;
-import org.apache.flink.table.typeutils.BinaryMapSerializer;
import org.apache.flink.table.typeutils.BinaryStringSerializer;
import org.apache.flink.table.typeutils.DecimalSerializer;
@@ -72,10 +76,12 @@ public class InternalSerializers {
DecimalType decimalType = (DecimalType) type;
return new DecimalSerializer(decimalType.getPrecision(), decimalType.getScale());
case ARRAY:
- return BinaryArraySerializer.INSTANCE;
+ return new BaseArraySerializer(((ArrayType) type).getElementType(), config);
case MAP:
+ MapType mapType = (MapType) type;
+ return new BaseMapSerializer(mapType.getKeyType(), mapType.getValueType());
case MULTISET:
- return BinaryMapSerializer.INSTANCE;
+ return new BaseMapSerializer(((MultisetType) type).getElementType(), new IntType());
case ROW:
RowType rowType = (RowType) type;
return new BaseRowSerializer(config, rowType);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
index d233083..e0c9ebb 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.typeutils.MapViewTypeInfo;
import org.apache.flink.types.Row;
import static org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
+import static org.apache.flink.table.types.PlannerTypeUtils.isPrimitive;
/**
* Converter between {@link TypeInformation} and {@link DataType}.
@@ -79,7 +80,8 @@ public class TypeInfoDataTypeConverter {
case VARBINARY: // ignore precision
return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
case ARRAY:
- if (dataType instanceof CollectionDataType) {
+ if (dataType instanceof CollectionDataType &&
+ !isPrimitive(((CollectionDataType) dataType).getElementDataType().getLogicalType())) {
return ObjectArrayTypeInfo.getInfoFor(
fromDataTypeToTypeInfo(((CollectionDataType) dataType).getElementDataType()));
} else {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/AbstractRowSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/AbstractRowSerializer.java
index 4d99b95..5043eb4 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/AbstractRowSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/AbstractRowSerializer.java
@@ -39,7 +39,7 @@ public abstract class AbstractRowSerializer<T extends BaseRow> extends TypeSeria
/**
* Convert a {@link BaseRow} to a {@link BinaryRow}.
*/
- public abstract BinaryRow baseRowToBinary(T baseRow) throws IOException;
+ public abstract BinaryRow toBinaryRow(T baseRow) throws IOException;
/**
* Serializes the given record to the given target paged output view. Some implementations may
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
new file mode 100644
index 0000000..d8bf95e
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BaseArray;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.GenericArray;
+import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+
+import static org.apache.flink.table.types.ClassLogicalTypeConverter.getInternalClassForType;
+
+/**
+ * Serializer for {@link BaseArray}.
+ */
+public class BaseArraySerializer extends TypeSerializer<BaseArray> {
+
+ private final LogicalType eleType;
+ private final TypeSerializer eleSer;
+
+ private transient BinaryArray reuseArray;
+ private transient BinaryArrayWriter reuseWriter;
+
+ public BaseArraySerializer(LogicalType eleType, ExecutionConfig conf) {
+ this.eleType = eleType;
+ this.eleSer = InternalSerializers.create(eleType, conf);
+ }
+
+ private BaseArraySerializer(LogicalType eleType, TypeSerializer eleSer) {
+ this.eleType = eleType;
+ this.eleSer = eleSer;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<BaseArray> duplicate() {
+ return new BaseArraySerializer(eleType, eleSer.duplicate());
+ }
+
+ @Override
+ public BaseArray createInstance() {
+ return new BinaryArray();
+ }
+
+ @Override
+ public BaseArray copy(BaseArray from) {
+ return from instanceof GenericArray ?
+ copyGenericArray((GenericArray) from) :
+ ((BinaryArray) from).copy();
+ }
+
+ @Override
+ public BaseArray copy(BaseArray from, BaseArray reuse) {
+ return copy(from);
+ }
+
+ private GenericArray copyGenericArray(GenericArray array) {
+ Object arr;
+ if (array.isPrimitiveArray()) {
+ switch (eleType.getTypeRoot()) {
+ case BOOLEAN:
+ arr = Arrays.copyOf((boolean[]) array.getArray(), array.numElements());
+ break;
+ case TINYINT:
+ arr = Arrays.copyOf((byte[]) array.getArray(), array.numElements());
+ break;
+ case SMALLINT:
+ arr = Arrays.copyOf((short[]) array.getArray(), array.numElements());
+ break;
+ case INTEGER:
+ arr = Arrays.copyOf((int[]) array.getArray(), array.numElements());
+ break;
+ case BIGINT:
+ arr = Arrays.copyOf((long[]) array.getArray(), array.numElements());
+ break;
+ case FLOAT:
+ arr = Arrays.copyOf((float[]) array.getArray(), array.numElements());
+ break;
+ case DOUBLE:
+ arr = Arrays.copyOf((double[]) array.getArray(), array.numElements());
+ break;
+ default:
+ throw new RuntimeException("Unknown type: " + eleType);
+ }
+ } else {
+ Object[] objectArray = (Object[]) array.getArray();
+ Object[] newArray = (Object[]) Array.newInstance(
+ getInternalClassForType(eleType),
+ array.numElements());
+ for (int i = 0; i < array.numElements(); i++) {
+ newArray[i] = eleSer.copy(objectArray[i]);
+ }
+ arr = newArray;
+ }
+ return new GenericArray(arr, array.numElements(), array.isPrimitiveArray());
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(BaseArray record, DataOutputView target) throws IOException {
+ BinaryArray binaryArray = toBinaryArray(record);
+ target.writeInt(binaryArray.getSizeInBytes());
+ SegmentsUtil.copyToView(binaryArray.getSegments(), binaryArray.getOffset(), binaryArray.getSizeInBytes(), target);
+ }
+
+ public BinaryArray toBinaryArray(BaseArray from) {
+ if (from instanceof BinaryArray) {
+ return (BinaryArray) from;
+ }
+
+ int numElements = from.numElements();
+ if (reuseArray == null) {
+ reuseArray = new BinaryArray();
+ }
+ if (reuseWriter == null || reuseWriter.getNumElements() != numElements) {
+ reuseWriter = new BinaryArrayWriter(
+ reuseArray, numElements, BinaryArray.calculateFixLengthPartSize(eleType));
+ } else {
+ reuseWriter.reset();
+ }
+
+ for (int i = 0; i < numElements; i++) {
+ if (from.isNullAt(i)) {
+ reuseWriter.setNullAt(i, eleType);
+ } else {
+ BinaryWriter.write(reuseWriter, i, TypeGetterSetters.get(from, i, eleType), eleType, eleSer);
+ }
+ }
+ reuseWriter.complete();
+
+ return reuseArray;
+ }
+
+ @Override
+ public BaseArray deserialize(DataInputView source) throws IOException {
+ return deserializeReuse(new BinaryArray(), source);
+ }
+
+ @Override
+ public BaseArray deserialize(BaseArray reuse, DataInputView source) throws IOException {
+ return deserializeReuse(reuse instanceof GenericArray ? new BinaryArray() : (BinaryArray) reuse, source);
+ }
+
+ private BinaryArray deserializeReuse(BinaryArray reuse, DataInputView source) throws IOException {
+ int length = source.readInt();
+ byte[] bytes = new byte[length];
+ source.readFully(bytes);
+ reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ int length = source.readInt();
+ target.writeInt(length);
+ target.write(source, length);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BaseArraySerializer that = (BaseArraySerializer) o;
+
+ return eleType.equals(that.eleType);
+ }
+
+ @Override
+ public int hashCode() {
+ return eleType.hashCode();
+ }
+
+ @Override
+ public TypeSerializerSnapshot<BaseArray> snapshotConfiguration() {
+ return new BaseArraySerializerSnapshot(eleType);
+ }
+
+ /**
+ * {@link TypeSerializerSnapshot} for {@link BaseArraySerializer}.
+ */
+ public static final class BaseArraySerializerSnapshot implements TypeSerializerSnapshot<BaseArray> {
+ private static final int CURRENT_VERSION = 3;
+
+ private LogicalType previousType;
+
+ @SuppressWarnings("unused")
+ public BaseArraySerializerSnapshot() {
+ // this constructor is used when restoring from a checkpoint/savepoint.
+ }
+
+ BaseArraySerializerSnapshot(LogicalType eleType) {
+ this.previousType = eleType;
+ }
+
+ @Override
+ public int getCurrentVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public void writeSnapshot(DataOutputView out) throws IOException {
+ InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousType);
+ }
+
+ @Override
+ public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+ try {
+ this.previousType = InstantiationUtil.deserializeObject(
+ new DataInputViewStream(in), userCodeClassLoader);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public TypeSerializer<BaseArray> restoreSerializer() {
+ return new BaseArraySerializer(previousType, new ExecutionConfig());
+ }
+
+ @Override
+ public TypeSerializerSchemaCompatibility<BaseArray> resolveSchemaCompatibility(TypeSerializer<BaseArray> newSerializer) {
+ if (!(newSerializer instanceof BaseArraySerializer)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ BaseArraySerializer newBaseArraySerializer = (BaseArraySerializer) newSerializer;
+ if (!previousType.equals(newBaseArraySerializer.eleType)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ } else {
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
new file mode 100644
index 0000000..01dba56
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BaseMap;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.GenericMap;
+import org.apache.flink.table.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Serializer for {@link BaseMap}.
+ */
+public class BaseMapSerializer extends TypeSerializer<BaseMap> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseMapSerializer.class);
+
+ private final LogicalType keyType;
+ private final LogicalType valueType;
+
+ private final TypeSerializer keySerializer;
+ private final TypeSerializer valueSerializer;
+
+ private transient BinaryArray reuseKeyArray;
+ private transient BinaryArray reuseValueArray;
+ private transient BinaryArrayWriter reuseKeyWriter;
+ private transient BinaryArrayWriter reuseValueWriter;
+
+ public BaseMapSerializer(LogicalType keyType, LogicalType valueType) {
+ this.keyType = keyType;
+ this.valueType = valueType;
+
+ this.keySerializer = InternalSerializers.create(keyType, new ExecutionConfig());
+ this.valueSerializer = InternalSerializers.create(valueType, new ExecutionConfig());
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<BaseMap> duplicate() {
+ return new BaseMapSerializer(keyType, valueType);
+ }
+
+ @Override
+ public BaseMap createInstance() {
+ return new BinaryMap();
+ }
+
+ /**
+ * NOTE: Map should be a HashMap, when we insert the key/value pairs of the TreeMap into
+ * a HashMap, problems maybe occur.
+ */
+ @Override
+ public BaseMap copy(BaseMap from) {
+ if (from instanceof GenericMap) {
+ Map<Object, Object> fromMap = ((GenericMap) from).getMap();
+ if (!(fromMap instanceof HashMap) && LOG.isDebugEnabled()) {
+ LOG.debug("It is dangerous to copy a non-HashMap to a HashMap.");
+ }
+ HashMap<Object, Object> toMap = new HashMap<>();
+ for (Map.Entry<Object, Object> entry : fromMap.entrySet()) {
+ toMap.put(
+ keySerializer.copy(entry.getKey()),
+ valueSerializer.copy(entry.getValue()));
+ }
+ return new GenericMap(toMap);
+ } else {
+ return ((BinaryMap) from).copy();
+ }
+ }
+
+ @Override
+ public BaseMap copy(BaseMap from, BaseMap reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(BaseMap record, DataOutputView target) throws IOException {
+ BinaryMap binaryMap = toBinaryMap(record);
+ target.writeInt(binaryMap.getSizeInBytes());
+ SegmentsUtil.copyToView(binaryMap.getSegments(), binaryMap.getOffset(), binaryMap.getSizeInBytes(), target);
+ }
+
+ public BinaryMap toBinaryMap(BaseMap from) {
+ if (from instanceof BinaryMap) {
+ return (BinaryMap) from;
+ }
+
+ Map<Object, Object> javaMap = ((GenericMap) from).getMap();
+ int numElements = javaMap.size();
+ if (reuseKeyArray == null) {
+ reuseKeyArray = new BinaryArray();
+ }
+ if (reuseValueArray == null) {
+ reuseValueArray = new BinaryArray();
+ }
+ if (reuseKeyWriter == null || reuseKeyWriter.getNumElements() != numElements) {
+ reuseKeyWriter = new BinaryArrayWriter(
+ reuseKeyArray, numElements, BinaryArray.calculateFixLengthPartSize(keyType));
+ } else {
+ reuseKeyWriter.reset();
+ }
+ if (reuseValueWriter == null || reuseValueWriter.getNumElements() != numElements) {
+ reuseValueWriter = new BinaryArrayWriter(
+ reuseValueArray, numElements, BinaryArray.calculateFixLengthPartSize(valueType));
+ } else {
+ reuseValueWriter.reset();
+ }
+
+ int i = 0;
+ for (Map.Entry<Object, Object> entry : javaMap.entrySet()) {
+ if (entry.getKey() == null) {
+ reuseKeyWriter.setNullAt(i, keyType);
+ } else {
+ BinaryWriter.write(reuseKeyWriter, i, entry.getKey(), keyType, keySerializer);
+ }
+ if (entry.getValue() == null) {
+ reuseValueWriter.setNullAt(i, valueType);
+ } else {
+ BinaryWriter.write(reuseValueWriter, i, entry.getValue(), valueType, valueSerializer);
+ }
+ i++;
+ }
+ reuseKeyWriter.complete();
+ reuseValueWriter.complete();
+
+ return BinaryMap.valueOf(reuseKeyArray, reuseValueArray);
+ }
+
+ @Override
+ public BaseMap deserialize(DataInputView source) throws IOException {
+ return deserializeReuse(new BinaryMap(), source);
+ }
+
+ @Override
+ public BaseMap deserialize(BaseMap reuse, DataInputView source) throws IOException {
+ return deserializeReuse(reuse instanceof GenericMap ? new BinaryMap() : (BinaryMap) reuse, source);
+ }
+
+ private BinaryMap deserializeReuse(BinaryMap reuse, DataInputView source) throws IOException {
+ int length = source.readInt();
+ byte[] bytes = new byte[length];
+ source.readFully(bytes);
+ reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
+ return reuse;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ int length = source.readInt();
+ target.writeInt(length);
+ target.write(source, length);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BaseMapSerializer that = (BaseMapSerializer) o;
+
+ return keyType.equals(that.keyType) && valueType.equals(that.valueType);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = keyType.hashCode();
+ result = 31 * result + valueType.hashCode();
+ return result;
+ }
+
+ @Override
+ public TypeSerializerSnapshot<BaseMap> snapshotConfiguration() {
+ return new BaseMapSerializerSnapshot(keyType, valueType);
+ }
+
+ /**
+ * {@link TypeSerializerSnapshot} for {@link BaseArraySerializer}.
+ */
+ public static final class BaseMapSerializerSnapshot implements TypeSerializerSnapshot<BaseMap> {
+ private static final int CURRENT_VERSION = 3;
+
+ private LogicalType previousKeyType;
+ private LogicalType previousValueType;
+
+ @SuppressWarnings("unused")
+ public BaseMapSerializerSnapshot() {
+ // this constructor is used when restoring from a checkpoint/savepoint.
+ }
+
+ BaseMapSerializerSnapshot(LogicalType keyT, LogicalType valueT) {
+ this.previousKeyType = keyT;
+ this.previousValueType = valueT;
+ }
+
+ @Override
+ public int getCurrentVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public void writeSnapshot(DataOutputView out) throws IOException {
+ InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousKeyType);
+ InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousValueType);
+ }
+
+ @Override
+ public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+ try {
+ this.previousKeyType = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader);
+ this.previousValueType = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public TypeSerializer<BaseMap> restoreSerializer() {
+ return new BaseMapSerializer(previousKeyType, previousValueType);
+ }
+
+ @Override
+ public TypeSerializerSchemaCompatibility<BaseMap> resolveSchemaCompatibility(TypeSerializer<BaseMap> newSerializer) {
+ if (!(newSerializer instanceof BaseMapSerializer)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ }
+
+ BaseMapSerializer newBaseMapSerializer = (BaseMapSerializer) newSerializer;
+ if (!previousKeyType.equals(newBaseMapSerializer.keyType) ||
+ !previousValueType.equals(newBaseMapSerializer.valueType)) {
+ return TypeSerializerSchemaCompatibility.incompatible();
+ } else {
+ return TypeSerializerSchemaCompatibility.compatibleAsIs();
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
index 799f803..4bf0546 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
@@ -53,6 +53,9 @@ public class BaseRowSerializer extends AbstractRowSerializer<BaseRow> {
private final LogicalType[] types;
private final TypeSerializer[] fieldSerializers;
+ private transient BinaryRow reuseRow;
+ private transient BinaryRowWriter reuseWriter;
+
public BaseRowSerializer(ExecutionConfig config, RowType rowType) {
this(rowType.getChildren().toArray(new LogicalType[0]),
rowType.getChildren().stream()
@@ -85,7 +88,7 @@ public class BaseRowSerializer extends AbstractRowSerializer<BaseRow> {
@Override
public void serialize(BaseRow row, DataOutputView target) throws IOException {
- binarySerializer.serialize(baseRowToBinary(row), target);
+ binarySerializer.serialize(toBinaryRow(row), target);
}
@Override
@@ -165,49 +168,33 @@ public class BaseRowSerializer extends AbstractRowSerializer<BaseRow> {
/**
* Convert base row to binary row.
- * TODO modify it to code gen, and reuse BinaryRow&BinaryRowWriter.
+ * TODO modify it to code gen.
*/
@Override
- public BinaryRow baseRowToBinary(BaseRow row) {
+ public BinaryRow toBinaryRow(BaseRow row) {
if (row instanceof BinaryRow) {
return (BinaryRow) row;
}
- BinaryRow binaryRow = new BinaryRow(types.length);
- BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
- writer.writeHeader(row.getHeader());
- for (int i = 0; i < types.length; i++) {
- if (row.isNullAt(i)) {
- writer.setNullAt(i);
- } else {
- BinaryWriter.write(writer, i, TypeGetterSetters.get(row, i, types[i]), types[i]);
- }
+ if (reuseRow == null) {
+ reuseRow = new BinaryRow(types.length);
+ reuseWriter = new BinaryRowWriter(reuseRow);
}
- writer.complete();
- return binaryRow;
- }
-
- public static BinaryRow baseRowToBinary(BaseRow row, RowType type) {
- if (row instanceof BinaryRow) {
- return (BinaryRow) row;
- }
- BinaryRow binaryRow = new BinaryRow(type.getFields().size());
- BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
- writer.writeHeader(row.getHeader());
- for (int i = 0; i < binaryRow.getArity(); i++) {
+ reuseWriter.reset();
+ reuseWriter.writeHeader(row.getHeader());
+ for (int i = 0; i < types.length; i++) {
if (row.isNullAt(i)) {
- writer.setNullAt(i);
+ reuseWriter.setNullAt(i);
} else {
- LogicalType fieldType = type.getFields().get(i).getType();
- BinaryWriter.write(writer, i, TypeGetterSetters.get(row, i, fieldType), fieldType);
+ BinaryWriter.write(reuseWriter, i, TypeGetterSetters.get(row, i, types[i]), types[i], fieldSerializers[i]);
}
}
- writer.complete();
- return binaryRow;
+ reuseWriter.complete();
+ return reuseRow;
}
@Override
public int serializeToPages(BaseRow row, AbstractPagedOutputView target) throws IOException {
- return binarySerializer.serializeToPages(baseRowToBinary(row), target);
+ return binarySerializer.serializeToPages(toBinaryRow(row), target);
}
@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArraySerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArraySerializer.java
deleted file mode 100644
index 7d988f3..0000000
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArraySerializer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils;
-
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.table.dataformat.BinaryArray;
-import org.apache.flink.table.util.SegmentsUtil;
-
-import java.io.IOException;
-
-/**
- * Serializer for {@link BinaryArray}.
- */
-public class BinaryArraySerializer extends TypeSerializerSingleton<BinaryArray> {
-
- public static final BinaryArraySerializer INSTANCE = new BinaryArraySerializer();
-
- private BinaryArraySerializer() {}
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public BinaryArray createInstance() {
- return new BinaryArray();
- }
-
- @Override
- public BinaryArray copy(BinaryArray from) {
- return from.copy();
- }
-
- @Override
- public BinaryArray copy(BinaryArray from, BinaryArray reuse) {
- return from.copy(reuse);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(BinaryArray record, DataOutputView target) throws IOException {
- target.writeInt(record.getSizeInBytes());
- SegmentsUtil.copyToView(record.getSegments(), record.getOffset(), record.getSizeInBytes(), target);
- }
-
- @Override
- public BinaryArray deserialize(DataInputView source) throws IOException {
- return deserialize(new BinaryArray(), source);
- }
-
- @Override
- public BinaryArray deserialize(BinaryArray reuse, DataInputView source) throws IOException {
- int length = source.readInt();
- byte[] bytes = new byte[length];
- source.readFully(bytes);
- reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
- return reuse;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- int length = source.readInt();
- target.writeInt(length);
- target.write(source, length);
- }
-
- @Override
- public TypeSerializerSnapshot<BinaryArray> snapshotConfiguration() {
- return new BinaryArraySerializerSnapshot();
- }
-
- /**
- * Serializer configuration snapshot for compatibility and format evolution.
- */
- @SuppressWarnings("WeakerAccess")
- public static final class BinaryArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<BinaryArray> {
-
- public BinaryArraySerializerSnapshot() {
- super(() -> INSTANCE);
- }
- }
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapSerializer.java
deleted file mode 100644
index e46e066..0000000
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapSerializer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils;
-
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.table.dataformat.BinaryMap;
-import org.apache.flink.table.util.SegmentsUtil;
-
-import java.io.IOException;
-
-/**
- * Serializer for {@link BinaryMap}.
- */
-public class BinaryMapSerializer extends TypeSerializerSingleton<BinaryMap> {
-
- public static final BinaryMapSerializer INSTANCE = new BinaryMapSerializer();
-
- private BinaryMapSerializer() {}
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public BinaryMap createInstance() {
- return new BinaryMap();
- }
-
- @Override
- public BinaryMap copy(BinaryMap from) {
- return from.copy();
- }
-
- @Override
- public BinaryMap copy(BinaryMap from, BinaryMap reuse) {
- return from.copy(reuse);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(BinaryMap record, DataOutputView target) throws IOException {
- target.writeInt(record.getSizeInBytes());
- SegmentsUtil.copyToView(record.getSegments(), record.getOffset(), record.getSizeInBytes(), target);
- }
-
- @Override
- public BinaryMap deserialize(DataInputView source) throws IOException {
- return deserialize(new BinaryMap(), source);
- }
-
- @Override
- public BinaryMap deserialize(BinaryMap reuse, DataInputView source) throws IOException {
- int length = source.readInt();
- byte[] bytes = new byte[length];
- source.readFully(bytes);
- reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
- return reuse;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- int length = source.readInt();
- target.writeInt(length);
- target.write(source, length);
- }
-
- @Override
- public TypeSerializerSnapshot<BinaryMap> snapshotConfiguration() {
- return new BinaryMapSerializerSnapshot();
- }
-
- /**
- * Serializer configuration snapshot for compatibility and format evolution.
- */
- @SuppressWarnings("WeakerAccess")
- public static final class BinaryMapSerializerSnapshot extends SimpleTypeSerializerSnapshot<BinaryMap> {
-
- public BinaryMapSerializerSnapshot() {
- super(() -> INSTANCE);
- }
- }
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
index 9804767..c187471 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
@@ -124,7 +124,7 @@ public class BinaryRowSerializer extends AbstractRowSerializer<BinaryRow> {
}
@Override
- public BinaryRow baseRowToBinary(BinaryRow baseRow) throws IOException {
+ public BinaryRow toBinaryRow(BinaryRow baseRow) throws IOException {
return baseRow;
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
index dd6350a..43bc6a6 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
@@ -18,8 +18,12 @@
package org.apache.flink.table.dataformat;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
import org.junit.Before;
import org.junit.Test;
@@ -97,9 +101,9 @@ public class BaseRowTest {
writer.writeGeneric(9, generic);
writer.writeDecimal(10, decimal1, 5);
writer.writeDecimal(11, decimal2, 20);
- writer.writeArray(12, array);
- writer.writeMap(13, map);
- writer.writeRow(14, underRow, RowType.of(new IntType(), new IntType()));
+ writer.writeArray(12, array, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
+ writer.writeMap(13, map, new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
+ writer.writeRow(14, underRow, new BaseRowSerializer(null, RowType.of(new IntType(), new IntType())));
writer.writeBinary(15, bytes);
return row;
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
index d7e8147..54d0802 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
@@ -21,9 +21,13 @@ package org.apache.flink.table.dataformat;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
import org.apache.flink.table.util.SegmentsUtil;
import org.junit.Assert;
@@ -59,10 +63,10 @@ public class BinaryArrayTest {
{
BinaryRow row2 = new BinaryRow(1);
BinaryRowWriter writer2 = new BinaryRowWriter(row2);
- writer2.writeArray(0, array);
+ writer2.writeArray(0, array, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
writer2.complete();
- BinaryArray array2 = row2.getArray(0);
+ BinaryArray array2 = (BinaryArray) row2.getArray(0);
assertEquals(array2, array);
assertEquals(array2.getInt(0), 6);
assertTrue(array2.isNullAt(1));
@@ -75,10 +79,10 @@ public class BinaryArrayTest {
BinaryRow row2 = new BinaryRow(1);
BinaryRowWriter writer2 = new BinaryRowWriter(row2);
- writer2.writeArray(0, array3);
+ writer2.writeArray(0, array3, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
writer2.complete();
- BinaryArray array2 = row2.getArray(0);
+ BinaryArray array2 = (BinaryArray) row2.getArray(0);
assertEquals(array2, array);
assertEquals(array2.getInt(0), 6);
assertTrue(array2.isNullAt(1));
@@ -304,7 +308,7 @@ public class BinaryArrayTest {
BinaryArray array = new BinaryArray();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
writer.setNullAt(0);
- writer.writeArray(1, subArray);
+ writer.writeArray(1, subArray, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
writer.complete();
assertTrue(array.isNullAt(0));
@@ -320,7 +324,8 @@ public class BinaryArrayTest {
BinaryArray array = new BinaryArray();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
writer.setNullAt(0);
- writer.writeMap(1, BinaryMap.valueOf(subArray, subArray));
+ writer.writeMap(1, BinaryMap.valueOf(subArray, subArray),
+ new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
writer.complete();
assertTrue(array.isNullAt(0));
@@ -352,10 +357,11 @@ public class BinaryArrayTest {
BinaryRow row = new BinaryRow(1);
BinaryRowWriter rowWriter = new BinaryRowWriter(row);
- rowWriter.writeMap(0, binaryMap);
+ rowWriter.writeMap(0, binaryMap,
+ new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
rowWriter.complete();
- BinaryMap map = row.getMap(0);
+ BinaryMap map = (BinaryMap) row.getMap(0);
BinaryArray key = map.keyArray();
BinaryArray value = map.valueArray();
@@ -468,7 +474,8 @@ public class BinaryArrayTest {
public void testNested() {
BinaryArray array = new BinaryArray();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
- writer.writeRow(0, GenericRow.of(fromString("1"), 1), RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
+ writer.writeRow(0, GenericRow.of(fromString("1"), 1),
+ new BaseRowSerializer(null, RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType())));
writer.setNullAt(1);
writer.complete();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
index 0be17dd..b4a6387 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.junit.Assert;
@@ -431,7 +432,8 @@ public class BinaryRowTest {
public void testNested() {
BinaryRow row = new BinaryRow(2);
BinaryRowWriter writer = new BinaryRowWriter(row);
- writer.writeRow(0, GenericRow.of(fromString("1"), 1), RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
+ writer.writeRow(0, GenericRow.of(fromString("1"), 1),
+ new BaseRowSerializer(null, RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType())));
writer.setNullAt(1);
writer.complete();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java
index 95c3549..c76b3e5 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java
@@ -18,12 +18,15 @@
package org.apache.flink.table.runtime.util;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.BinaryRowWriter;
import org.apache.flink.table.dataformat.BinaryWriter;
import org.apache.flink.table.dataformat.TypeGetterSetters;
import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector;
+import org.apache.flink.table.types.InternalSerializers;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
@@ -37,13 +40,17 @@ public class BinaryRowKeySelector implements BaseRowKeySelector {
private final int[] keyFields;
private final LogicalType[] inputFieldTypes;
private final LogicalType[] keyFieldTypes;
+ private final TypeSerializer[] keySers;
public BinaryRowKeySelector(int[] keyFields, LogicalType[] inputFieldTypes) {
this.keyFields = keyFields;
this.inputFieldTypes = inputFieldTypes;
this.keyFieldTypes = new LogicalType[keyFields.length];
+ this.keySers = new TypeSerializer[keyFields.length];
+ ExecutionConfig conf = new ExecutionConfig();
for (int i = 0; i < keyFields.length; ++i) {
keyFieldTypes[i] = inputFieldTypes[keyFields[i]];
+ keySers[i] = InternalSerializers.create(keyFieldTypes[i], conf);
}
}
@@ -59,7 +66,8 @@ public class BinaryRowKeySelector implements BaseRowKeySelector {
writer,
i,
TypeGetterSetters.get(value, keyFields[i], inputFieldTypes[keyFields[i]]),
- inputFieldTypes[keyFields[i]]);
+ inputFieldTypes[keyFields[i]],
+ keySers[i]);
}
}
writer.complete();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryArraySerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
similarity index 53%
rename from flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryArraySerializerTest.java
rename to flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
index d1e5b35..feb9893 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryArraySerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
@@ -18,19 +18,49 @@
package org.apache.flink.table.typeutils;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.dataformat.BaseArray;
import org.apache.flink.table.dataformat.BinaryArray;
import org.apache.flink.table.dataformat.BinaryArrayWriter;
import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.dataformat.GenericArray;
+import org.apache.flink.testutils.DeeplyEqualsChecker;
/**
- * A test for the {@link BinaryArraySerializer}.
+ * A test for the {@link BaseArraySerializer}.
*/
-public class BinaryArraySerializerTest extends SerializerTestBase<BinaryArray> {
+public class BaseArraySerializerTest extends SerializerTestBase<BaseArray> {
+
+ public BaseArraySerializerTest() {
+ super(new DeeplyEqualsChecker().withCustomCheck(
+ (o1, o2) -> o1 instanceof BaseArray && o2 instanceof BaseArray,
+ (o1, o2, checker) -> {
+ BaseArray array1 = (BaseArray) o1;
+ BaseArray array2 = (BaseArray) o2;
+ if (array1.numElements() != array2.numElements()) {
+ return false;
+ }
+ for (int i = 0; i < array1.numElements(); i++) {
+ if (!array1.isNullAt(i) || !array2.isNullAt(i)) {
+ if (array1.isNullAt(i) || array2.isNullAt(i)) {
+ return false;
+ } else {
+ if (!array1.getString(i).equals(array2.getString(i))) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+ ));
+ }
@Override
- protected BinaryArraySerializer createSerializer() {
- return BinaryArraySerializer.INSTANCE;
+ protected BaseArraySerializer createSerializer() {
+ return new BaseArraySerializer(DataTypes.STRING().getLogicalType(), new ExecutionConfig());
}
@Override
@@ -39,14 +69,14 @@ public class BinaryArraySerializerTest extends SerializerTestBase<BinaryArray> {
}
@Override
- protected Class<BinaryArray> getTypeClass() {
- return BinaryArray.class;
+ protected Class<BaseArray> getTypeClass() {
+ return BaseArray.class;
}
@Override
- protected BinaryArray[] getTestData() {
- return new BinaryArray[] {
- createArray("11"),
+ protected BaseArray[] getTestData() {
+ return new BaseArray[] {
+ new GenericArray(new BinaryString[] {BinaryString.fromString("11")}, 1),
createArray("11", "haa"),
createArray("11", "haa", "ke"),
createArray("11", "haa", "ke"),
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
new file mode 100644
index 0000000..38216dc
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.dataformat.BaseMap;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.dataformat.GenericMap;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.testutils.DeeplyEqualsChecker;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A test for the {@link BaseMapSerializer}.
+ */
+public class BaseMapSerializerTest extends SerializerTestBase<BaseMap> {
+
+ private static final LogicalType INT = DataTypes.INT().getLogicalType();
+ private static final LogicalType STRING = DataTypes.STRING().getLogicalType();
+
+ public BaseMapSerializerTest() {
+ super(new DeeplyEqualsChecker().withCustomCheck(
+ (o1, o2) -> o1 instanceof BaseMap && o2 instanceof BaseMap,
+ (o1, o2, checker) ->
+ // Better is more proper to compare the maps after changing them to Java maps
+ // instead of binary maps. For example, consider the following two maps:
+ // {1: 'a', 2: 'b', 3: 'c'} and {3: 'c', 2: 'b', 1: 'a'}
+ // These are actually the same maps, but their key / value order will be
+ // different when stored as binary maps, and the equalsTo method of binary
+ // map will return false.
+ ((BaseMap) o1).toJavaMap(INT, STRING)
+ .equals(((BaseMap) o2).toJavaMap(INT, STRING))
+ ));
+ }
+
+ private static BaseMapSerializer newSer() {
+ return new BaseMapSerializer(INT, STRING);
+ }
+
+ @Override
+ protected BaseMapSerializer createSerializer() {
+ return newSer();
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<BaseMap> getTypeClass() {
+ return BaseMap.class;
+ }
+
+ @Override
+ protected BaseMap[] getTestData() {
+ Map<Object, Object> first = new HashMap<>();
+ first.put(1, BinaryString.fromString(""));
+ return new BaseMap[] {
+ new GenericMap(first),
+ BinaryMap.valueOf(createArray(1, 2), BaseArraySerializerTest.createArray("11", "haa")),
+ BinaryMap.valueOf(createArray(1, 3, 4), BaseArraySerializerTest.createArray("11", "haa", "ke")),
+ BinaryMap.valueOf(createArray(1, 4, 2), BaseArraySerializerTest.createArray("11", "haa", "ke")),
+ BinaryMap.valueOf(createArray(1, 5, 6, 7), BaseArraySerializerTest.createArray("11", "lele", "haa", "ke"))
+ };
+ }
+
+ private static BinaryArray createArray(int... vs) {
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, vs.length, 4);
+ for (int i = 0; i < vs.length; i++) {
+ writer.writeInt(i, vs[i]);
+ }
+ writer.complete();
+ return array;
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseRowSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseRowSerializerTest.java
index 9295973..c392c19 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseRowSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseRowSerializerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.testutils.DeeplyEqualsChecker;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -62,7 +63,9 @@ public class BaseRowSerializerTest extends SerializerTestInstance<BaseRow> {
new DeeplyEqualsChecker()
.withCustomCheck(
(o1, o2) -> o1 instanceof BaseRow && o2 instanceof BaseRow,
- (o1, o2, checker) -> deepEqualsBaseRow((BaseRow) o1, (BaseRow) o2, serializer)
+ (o1, o2, checker) -> deepEqualsBaseRow((BaseRow) o1, (BaseRow) o2,
+ (BaseRowSerializer) serializer.duplicate(),
+ (BaseRowSerializer) serializer.duplicate())
),
serializer,
BaseRow.class,
@@ -182,39 +185,44 @@ public class BaseRowSerializerTest extends SerializerTestInstance<BaseRow> {
return row;
}
- private static boolean deepEqualsBaseRow(BaseRow should, BaseRow is, BaseRowSerializer serializer) {
+ private static boolean deepEqualsBaseRow(BaseRow should, BaseRow is,
+ BaseRowSerializer serializer1, BaseRowSerializer serializer2) {
if (should.getArity() != is.getArity()) {
return false;
}
+ BinaryRow row1 = serializer1.toBinaryRow(should);
+ BinaryRow row2 = serializer2.toBinaryRow(is);
- return Objects.equals(serializer.baseRowToBinary(should), serializer.baseRowToBinary(is));
+ return Objects.equals(row1, row2);
}
- private boolean deepEquals(BaseRow should, BaseRow is) {
- return deepEqualsBaseRow(should, is, serializer);
+ private void checkDeepEquals(BaseRow should, BaseRow is) {
+ boolean equals = deepEqualsBaseRow(should, is,
+ (BaseRowSerializer) serializer.duplicate(), (BaseRowSerializer) serializer.duplicate());
+ Assert.assertTrue(equals);
}
@Test
public void testCopy() {
for (BaseRow row : testData) {
- deepEquals(row, serializer.copy(row));
+ checkDeepEquals(row, serializer.copy(row));
}
for (BaseRow row : testData) {
- deepEquals(row, serializer.copy(row, new GenericRow(row.getArity())));
+ checkDeepEquals(row, serializer.copy(row, new GenericRow(row.getArity())));
}
for (BaseRow row : testData) {
- deepEquals(row, serializer.copy(serializer.baseRowToBinary(row),
+ checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row),
new GenericRow(row.getArity())));
}
for (BaseRow row : testData) {
- deepEquals(row, serializer.copy(serializer.baseRowToBinary(row)));
+ checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row)));
}
for (BaseRow row : testData) {
- deepEquals(row, serializer.copy(serializer.baseRowToBinary(row),
+ checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row),
new BinaryRow(row.getArity())));
}
}
@@ -229,7 +237,7 @@ public class BaseRowSerializerTest extends SerializerTestInstance<BaseRow> {
public void testWrongCopyReuse() {
thrown.expect(IllegalArgumentException.class);
for (BaseRow row : testData) {
- deepEquals(row, serializer.copy(row, new GenericRow(row.getArity() + 1)));
+ checkDeepEquals(row, serializer.copy(row, new GenericRow(row.getArity() + 1)));
}
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryGenericSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryGenericSerializerTest.java
index dc51eb8..f8fc22b 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryGenericSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryGenericSerializerTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.table.dataformat.BinaryGeneric;
/**
- * A test for the {@link BinaryArraySerializer}.
+ * A test for the {@link BinaryGenericSerializer}.
*/
public class BinaryGenericSerializerTest extends SerializerTestBase<BinaryGeneric<String>> {
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryMapSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryMapSerializerTest.java
deleted file mode 100644
index fe6b4e5..0000000
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryMapSerializerTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils;
-
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.table.dataformat.BinaryArray;
-import org.apache.flink.table.dataformat.BinaryArrayWriter;
-import org.apache.flink.table.dataformat.BinaryMap;
-
-/**
- * A test for the {@link BinaryArraySerializer}.
- */
-public class BinaryMapSerializerTest extends SerializerTestBase<BinaryMap> {
-
- @Override
- protected BinaryMapSerializer createSerializer() {
- return BinaryMapSerializer.INSTANCE;
- }
-
- @Override
- protected int getLength() {
- return -1;
- }
-
- @Override
- protected Class<BinaryMap> getTypeClass() {
- return BinaryMap.class;
- }
-
- @Override
- protected BinaryMap[] getTestData() {
- return new BinaryMap[] {
- BinaryMap.valueOf(createArray(1), BinaryArraySerializerTest.createArray("11")),
- BinaryMap.valueOf(createArray(1, 2), BinaryArraySerializerTest.createArray("11", "haa")),
- BinaryMap.valueOf(createArray(1, 3, 4), BinaryArraySerializerTest.createArray("11", "haa", "ke")),
- BinaryMap.valueOf(createArray(1, 4, 2), BinaryArraySerializerTest.createArray("11", "haa", "ke")),
- BinaryMap.valueOf(createArray(1, 5, 6, 7), BinaryArraySerializerTest.createArray("11", "lele", "haa", "ke"))
- };
- }
-
- private static BinaryArray createArray(int... vs) {
- BinaryArray array = new BinaryArray();
- BinaryArrayWriter writer = new BinaryArrayWriter(array, vs.length, 8);
- for (int i = 0; i < vs.length; i++) {
- writer.writeInt(i, vs[i]);
- }
- writer.complete();
- return array;
- }
-}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java
index 11f3ea3..f7b1f9e 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.dataformat.BinaryRowWriter;
import org.apache.flink.table.dataformat.BinaryString;
/**
- * A test for the {@link BinaryArraySerializer}.
+ * A test for the {@link BinaryRowSerializer}.
*/
public class BinaryRowSerializerTest extends SerializerTestBase<BinaryRow> {
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java
index 8fdb7b0..9f9a477 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.table.dataformat.Decimal;
/**
- * A test for the {@link BinaryArraySerializer}.
+ * A test for the {@link DecimalSerializer}.
*/
public class DecimalSerializerTest extends SerializerTestBase<Decimal> {