You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/05 06:44:11 UTC

[flink] branch master updated: [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 906616b  [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead
906616b is described below

commit 906616baeaf482662d5ba621c14e513912977c5e
Author: Jingsong Lee <lz...@aliyun.com>
AuthorDate: Fri Jul 5 14:43:59 2019 +0800

    [FLINK-12796][table-planner-blink] Introduce BaseArray and BaseMap to reduce conversion overhead
    
    This closes #8682
---
 .../apache/flink/table/codegen/CodeGenUtils.scala  |  20 +-
 .../apache/flink/table/codegen/GenerateUtils.scala |   2 +-
 .../table/codegen/calls/ScalarOperatorGens.scala   | 208 +++++++++++----
 .../flink/table/codegen/SortCodeGeneratorTest.java |   4 +-
 .../table/runtime/utils/RangeInputFormat.java      |  67 +++++
 .../apache/flink/table/util/BaseRowTestUtil.java   |   5 +-
 .../runtime/batch/sql/agg/SortAggITCase.scala      | 249 ++++++++++++++++-
 .../table/runtime/utils/BatchTableEnvUtil.scala    |   2 +-
 .../flink/table/runtime/utils/BatchTestBase.scala  |  24 +-
 .../table/dataformat/AbstractBinaryWriter.java     |  17 +-
 .../apache/flink/table/dataformat/BaseArray.java   |  65 +++++
 .../org/apache/flink/table/dataformat/BaseMap.java |  53 ++++
 .../apache/flink/table/dataformat/BinaryArray.java |  35 ++-
 .../flink/table/dataformat/BinaryArrayWriter.java  |   4 +
 .../apache/flink/table/dataformat/BinaryMap.java   |  19 +-
 .../apache/flink/table/dataformat/BinaryRow.java   |   4 +-
 .../flink/table/dataformat/BinaryWriter.java       |  21 +-
 .../apache/flink/table/dataformat/ColumnarRow.java |   4 +-
 .../table/dataformat/DataFormatConverters.java     | 186 +++++++++----
 .../flink/table/dataformat/GenericArray.java       | 295 +++++++++++++++++++++
 .../apache/flink/table/dataformat/GenericMap.java} |  44 +--
 .../apache/flink/table/dataformat/JoinedRow.java   |   4 +-
 .../apache/flink/table/dataformat/NestedRow.java   |   4 +-
 .../flink/table/dataformat/ObjectArrayRow.java     |   8 +-
 .../flink/table/dataformat/TypeGetterSetters.java  |   8 +-
 .../flink/table/dataformat/UpdatableRow.java       |   6 +-
 .../table/runtime/hashtable/BinaryHashTable.java   |   4 +-
 .../table/runtime/sort/StreamSortOperator.java     |   2 +-
 .../table/types/ClassLogicalTypeConverter.java     |   8 +-
 .../flink/table/types/InternalSerializers.java     |  14 +-
 .../table/types/TypeInfoDataTypeConverter.java     |   4 +-
 .../table/typeutils/AbstractRowSerializer.java     |   2 +-
 .../flink/table/typeutils/BaseArraySerializer.java | 280 +++++++++++++++++++
 .../flink/table/typeutils/BaseMapSerializer.java   | 285 ++++++++++++++++++++
 .../flink/table/typeutils/BaseRowSerializer.java   |  47 ++--
 .../table/typeutils/BinaryArraySerializer.java     | 107 --------
 .../flink/table/typeutils/BinaryMapSerializer.java | 107 --------
 .../flink/table/typeutils/BinaryRowSerializer.java |   2 +-
 .../apache/flink/table/dataformat/BaseRowTest.java |  10 +-
 .../flink/table/dataformat/BinaryArrayTest.java    |  25 +-
 .../flink/table/dataformat/BinaryRowTest.java      |   4 +-
 .../table/runtime/util/BinaryRowKeySelector.java   |  10 +-
 ...lizerTest.java => BaseArraySerializerTest.java} |  48 +++-
 .../table/typeutils/BaseMapSerializerTest.java     |  99 +++++++
 .../table/typeutils/BaseRowSerializerTest.java     |  30 ++-
 .../typeutils/BinaryGenericSerializerTest.java     |   2 +-
 .../table/typeutils/BinaryMapSerializerTest.java   |  66 -----
 .../table/typeutils/BinaryRowSerializerTest.java   |   2 +-
 .../table/typeutils/DecimalSerializerTest.java     |   2 +-
 49 files changed, 1972 insertions(+), 546 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index 1670a95..e324c12 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -66,12 +66,16 @@ object CodeGenUtils {
 
   val BINARY_ARRAY: String = className[BinaryArray]
 
+  val BASE_ARRAY: String = className[BaseArray]
+
   val BINARY_GENERIC: String = className[BinaryGeneric[_]]
 
   val BINARY_STRING: String = className[BinaryString]
 
   val BINARY_MAP: String = className[BinaryMap]
 
+  val BASE_MAP: String = className[BaseMap]
+
   val BASE_ROW: String = className[BaseRow]
 
   val JOINED_ROW: String = className[JoinedRow]
@@ -152,8 +156,8 @@ object CodeGenUtils {
     case VARBINARY | BINARY => "byte[]"
 
     case DECIMAL => className[Decimal]
-    case ARRAY => className[BinaryArray]
-    case MULTISET | MAP => className[BinaryMap]
+    case ARRAY => className[BaseArray]
+    case MULTISET | MAP => className[BaseMap]
     case ROW => className[BaseRow]
 
     case ANY => className[BinaryGeneric[_]]
@@ -631,11 +635,15 @@ object CodeGenUtils {
       case INTERVAL_DAY_TIME => s"$writerTerm.writeLong($indexTerm, $fieldValTerm)"
 
       // complex types
-      case ARRAY => s"$writerTerm.writeArray($indexTerm, $fieldValTerm)"
-      case MULTISET | MAP => s"$writerTerm.writeMap($indexTerm, $fieldValTerm)"
+      case ARRAY =>
+        val ser = ctx.addReusableTypeSerializer(t)
+        s"$writerTerm.writeArray($indexTerm, $fieldValTerm, $ser)"
+      case MULTISET | MAP =>
+        val ser = ctx.addReusableTypeSerializer(t)
+        s"$writerTerm.writeMap($indexTerm, $fieldValTerm, $ser)"
       case ROW =>
-        val typeTerm = ctx.addReusableObject(t, "rowType")
-        s"$writerTerm.writeRow($indexTerm, $fieldValTerm, $typeTerm)"
+        val ser = ctx.addReusableTypeSerializer(t)
+        s"$writerTerm.writeRow($indexTerm, $fieldValTerm, $ser)"
 
       case ANY => s"$writerTerm.writeGeneric($indexTerm, $fieldValTerm)"
     }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
index 4b39c05..70db90a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala
@@ -669,7 +669,7 @@ object GenerateUtils {
         SortUtil.getNullDefaultOrder(true), at, "a", "b")
       val funcCode: String =
         s"""
-          public int $compareFunc($BINARY_ARRAY a, $BINARY_ARRAY b) {
+          public int $compareFunc($BASE_ARRAY a, $BASE_ARRAY b) {
             $compareCode
             return 0;
           }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
index 95b785d..4e8f282 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala
@@ -1495,7 +1495,7 @@ object ScalarOperatorGens {
 
     checkArgument(resultType.isInstanceOf[MapType])
     val mapType = resultType.asInstanceOf[MapType]
-    val mapTerm = newName("map")
+    val baseMap = newName("map")
 
     // prepare map key array
     val keyElements = elements.grouped(2).map { case Seq(key, _) => key }.toSeq
@@ -1510,11 +1510,14 @@ object ScalarOperatorGens {
     val isValueFixLength = isPrimitive(valueType)
 
     // construct binary map
-    ctx.addReusableMember(s"$BINARY_MAP $mapTerm = null;")
+    ctx.addReusableMember(s"$BASE_MAP $baseMap = null;")
 
     val code = if (isKeyFixLength && isValueFixLength) {
+      val binaryMap = newName("binaryMap")
+      ctx.addReusableMember(s"$BINARY_MAP $binaryMap = null;")
       // the key and value are fixed length, initialize and reuse the map in constructor
-      val init = s"$mapTerm = $BINARY_MAP.valueOf(${keyExpr.resultTerm}, ${valueExpr.resultTerm});"
+      val init =
+        s"$binaryMap = $BINARY_MAP.valueOf(${keyExpr.resultTerm}, ${valueExpr.resultTerm});"
       ctx.addReusableInitStatement(init)
       // there are some non-literal primitive fields need to update
       val keyArrayTerm = newName("keyArray")
@@ -1524,20 +1527,21 @@ object ScalarOperatorGens {
       val valueUpdate = generatePrimitiveArrayUpdateCode(
         ctx, valueArrayTerm, valueType, valueElements)
       s"""
-         |$BINARY_ARRAY $keyArrayTerm = $mapTerm.keyArray();
+         |$BINARY_ARRAY $keyArrayTerm = $binaryMap.keyArray();
          |$keyUpdate
-         |$BINARY_ARRAY $valueArrayTerm = $mapTerm.valueArray();
+         |$BINARY_ARRAY $valueArrayTerm = $binaryMap.valueArray();
          |$valueUpdate
+         |$baseMap = $binaryMap;
        """.stripMargin
     } else {
       // the key or value is not fixed length, re-create the map on every update
       s"""
          |${keyExpr.code}
          |${valueExpr.code}
-         |$mapTerm = $BINARY_MAP.valueOf(${keyExpr.resultTerm}, ${valueExpr.resultTerm});
+         |$baseMap = $BINARY_MAP.valueOf(${keyExpr.resultTerm}, ${valueExpr.resultTerm});
        """.stripMargin
     }
-    GeneratedExpression(mapTerm, NEVER_NULL, code, resultType)
+    GeneratedExpression(baseMap, NEVER_NULL, code, resultType)
   }
 
   def generateMapGet(
@@ -1551,6 +1555,7 @@ object ScalarOperatorGens {
     val values = newName("values")
     val index = newName("index")
     val found = newName("found")
+    val tmpValue = newName("value")
 
     val mapType = map.resultType.asInstanceOf[MapType]
     val keyType = mapType.getKeyType
@@ -1560,42 +1565,59 @@ object ScalarOperatorGens {
     val keyTypeTerm = primitiveTypeTermForType(keyType)
     val valueTypeTerm = primitiveTypeTermForType(valueType)
     val valueDefault = primitiveDefaultValue(valueType)
+    val binaryMapTypeTerm = classOf[BinaryMap].getCanonicalName
+    val binaryMapTerm = newName("binaryMap")
+    val genericMapTypeTerm = classOf[GenericMap].getCanonicalName
+    val genericMapTerm = newName("genericMap")
+    val boxedValueTypeTerm = boxedTypeTermForType(valueType)
 
     val mapTerm = map.resultTerm
 
     val equal = generateEquals(ctx, key, GeneratedExpression(tmpKey, NEVER_NULL, NO_CODE, keyType))
     val code =
       s"""
-         |final int $length = $mapTerm.numElements();
-         |final $BINARY_ARRAY $keys = $mapTerm.keyArray();
-         |final $BINARY_ARRAY $values = $mapTerm.valueArray();
+         |if ($mapTerm instanceof $binaryMapTypeTerm) {
+         |  $binaryMapTypeTerm $binaryMapTerm = ($binaryMapTypeTerm) $mapTerm;
+         |  final int $length = $binaryMapTerm.numElements();
+         |  final $BINARY_ARRAY $keys = $binaryMapTerm.keyArray();
+         |  final $BINARY_ARRAY $values = $binaryMapTerm.valueArray();
          |
-         |int $index = 0;
-         |boolean $found = false;
-         |if (${key.nullTerm}) {
-         |  while ($index < $length && !$found) {
-         |    if ($keys.isNullAt($index)) {
-         |      $found = true;
-         |    } else {
-         |      $index++;
+         |  int $index = 0;
+         |  boolean $found = false;
+         |  if (${key.nullTerm}) {
+         |    while ($index < $length && !$found) {
+         |      if ($keys.isNullAt($index)) {
+         |        $found = true;
+         |      } else {
+         |        $index++;
+         |      }
          |    }
-         |  }
-         |} else {
-         |  while ($index < $length && !$found) {
-         |    final $keyTypeTerm $tmpKey = ${baseRowFieldReadAccess(ctx, index, keys, keyType)};
-         |    ${equal.code}
-         |    if (${equal.resultTerm}) {
-         |      $found = true;
-         |    } else {
-         |      $index++;
+         |  } else {
+         |    while ($index < $length && !$found) {
+         |      final $keyTypeTerm $tmpKey = ${baseRowFieldReadAccess(ctx, index, keys, keyType)};
+         |      ${equal.code}
+         |      if (${equal.resultTerm}) {
+         |        $found = true;
+         |      } else {
+         |        $index++;
+         |      }
          |    }
          |  }
-         |}
          |
-         |if (!$found || $values.isNullAt($index)) {
-         |  $nullTerm = true;
+         |  if (!$found || $values.isNullAt($index)) {
+         |    $nullTerm = true;
+         |  } else {
+         |    $resultTerm = ${baseRowFieldReadAccess(ctx, index, values, valueType)};
+         |  }
          |} else {
-         |  $resultTerm = ${baseRowFieldReadAccess(ctx, index, values, valueType)};
+         |  $genericMapTypeTerm $genericMapTerm = ($genericMapTypeTerm) $mapTerm;
+         |  $boxedValueTypeTerm $tmpValue =
+         |    ($boxedValueTypeTerm) $genericMapTerm.get(($keyTypeTerm) ${key.resultTerm});
+         |  if ($tmpValue == null) {
+         |    $nullTerm = true;
+         |  } else {
+         |    $resultTerm = $tmpValue;
+         |  }
          |}
         """.stripMargin
 
@@ -1707,8 +1729,12 @@ object ScalarOperatorGens {
         val builderTerm = newName("builder")
         ctx.addReusableMember(s"$builderCls $builderTerm = new $builderCls();")
 
-        val binaryMapTerm = terms.head
-        val arrayCls = classOf[BinaryArray].getCanonicalName
+        val mapTerm = terms.head
+        val genericMapCls = classOf[GenericMap].getCanonicalName
+        val genericMapTerm = newName("genericMap")
+        val binaryMapCls = classOf[BinaryMap].getCanonicalName
+        val binaryMapTerm = newName("binaryMap")
+        val arrayCls = classOf[BaseArray].getCanonicalName
         val keyArrayTerm = newName("keyArray")
         val valueArrayTerm = newName("valueArray")
 
@@ -1750,36 +1776,42 @@ object ScalarOperatorGens {
         val stmt =
           s"""
              |String $resultTerm;
-             |$arrayCls $keyArrayTerm = $binaryMapTerm.keyArray();
-             |$arrayCls $valueArrayTerm = $binaryMapTerm.valueArray();
+             |if ($mapTerm instanceof $binaryMapCls) {
+             |  $binaryMapCls $binaryMapTerm = ($binaryMapCls) $mapTerm;
+             |  $arrayCls $keyArrayTerm = $binaryMapTerm.keyArray();
+             |  $arrayCls $valueArrayTerm = $binaryMapTerm.valueArray();
              |
-             |$builderTerm.setLength(0);
-             |$builderTerm.append("{");
+             |  $builderTerm.setLength(0);
+             |  $builderTerm.append("{");
              |
-             |int $numTerm = $binaryMapTerm.numElements();
-             |for (int $indexTerm = 0; $indexTerm < $numTerm; $indexTerm++) {
-             |  if ($indexTerm != 0) {
-             |    $builderTerm.append(", ");
-             |  }
+             |  int $numTerm = $binaryMapTerm.numElements();
+             |  for (int $indexTerm = 0; $indexTerm < $numTerm; $indexTerm++) {
+             |    if ($indexTerm != 0) {
+             |      $builderTerm.append(", ");
+             |    }
              |
-             |  ${keyCastExpr.code}
-             |  if (${keyCastExpr.nullTerm}) {
-             |    $builderTerm.append("null");
-             |  } else {
-             |    $builderTerm.append(${keyCastExpr.resultTerm});
-             |  }
-             |  $builderTerm.append("=");
+             |    ${keyCastExpr.code}
+             |    if (${keyCastExpr.nullTerm}) {
+             |      $builderTerm.append("null");
+             |    } else {
+             |      $builderTerm.append(${keyCastExpr.resultTerm});
+             |    }
+             |    $builderTerm.append("=");
              |
-             |  ${valueCastExpr.code}
-             |  if (${valueCastExpr.nullTerm}) {
-             |    $builderTerm.append("null");
-             |  } else {
-             |    $builderTerm.append(${valueCastExpr.resultTerm});
+             |    ${valueCastExpr.code}
+             |    if (${valueCastExpr.nullTerm}) {
+             |      $builderTerm.append("null");
+             |    } else {
+             |      $builderTerm.append(${valueCastExpr.resultTerm});
+             |    }
              |  }
-             |}
-             |$builderTerm.append("}");
+             |  $builderTerm.append("}");
              |
-             |$resultTerm = $builderTerm.toString();
+             |  $resultTerm = $builderTerm.toString();
+             |} else {
+             |  $genericMapCls $genericMapTerm = ($genericMapCls) $mapTerm;
+             |  $resultTerm = $genericMapTerm.toString();
+             |}
              """.stripMargin
         (stmt, resultTerm)
     }
@@ -1900,8 +1932,68 @@ object ScalarOperatorGens {
       args =>
         val leftTerm = args.head
         val rightTerm = args(1)
+
         val resultTerm = newName("compareResult")
-        val stmt = s"boolean $resultTerm = $leftTerm.equals($rightTerm);"
+        val binaryMapCls = classOf[BinaryMap].getCanonicalName
+
+        val mapType = left.resultType.asInstanceOf[MapType]
+        val mapCls = classOf[java.util.Map[AnyRef, AnyRef]].getCanonicalName
+        val keyCls = boxedTypeTermForType(mapType.getKeyType)
+        val valueCls = boxedTypeTermForType(mapType.getValueType)
+
+        val leftMapTerm = newName("leftMap")
+        val leftKeyTerm = newName("leftKey")
+        val leftValueTerm = newName("leftValue")
+        val leftValueNullTerm = newName("leftValueIsNull")
+        val leftValueExpr =
+          GeneratedExpression(leftValueTerm, leftValueNullTerm, "", mapType.getValueType)
+
+        val rightMapTerm = newName("rightMap")
+        val rightValueTerm = newName("rightValue")
+        val rightValueNullTerm = newName("rightValueIsNull")
+        val rightValueExpr =
+          GeneratedExpression(rightValueTerm, rightValueNullTerm, "", mapType.getValueType)
+
+        val entryTerm = newName("entry")
+        val entryCls = classOf[java.util.Map.Entry[AnyRef, AnyRef]].getCanonicalName
+        val valueEqualsExpr = generateEquals(ctx, leftValueExpr, rightValueExpr)
+
+        val internalTypeCls = classOf[LogicalType].getCanonicalName
+        val keyTypeTerm =
+          ctx.addReusableObject(mapType.getKeyType, "keyType", internalTypeCls)
+        val valueTypeTerm =
+          ctx.addReusableObject(mapType.getValueType, "valueType", internalTypeCls)
+
+        val stmt =
+          s"""
+             |boolean $resultTerm;
+             |if ($leftTerm.numElements() == $rightTerm.numElements()) {
+             |  $resultTerm = true;
+             |  $mapCls $leftMapTerm = $leftTerm.toJavaMap($keyTypeTerm, $valueTypeTerm);
+             |  $mapCls $rightMapTerm = $rightTerm.toJavaMap($keyTypeTerm, $valueTypeTerm);
+             |
+             |  for ($entryCls $entryTerm : $leftMapTerm.entrySet()) {
+             |    $keyCls $leftKeyTerm = ($keyCls) $entryTerm.getKey();
+             |    if ($rightMapTerm.containsKey($leftKeyTerm)) {
+             |      $valueCls $leftValueTerm = ($valueCls) $entryTerm.getValue();
+             |      $valueCls $rightValueTerm = ($valueCls) $rightMapTerm.get($leftKeyTerm);
+             |      boolean $leftValueNullTerm = ($leftValueTerm == null);
+             |      boolean $rightValueNullTerm = ($rightValueTerm == null);
+             |
+             |      ${valueEqualsExpr.code}
+             |      if (!${valueEqualsExpr.resultTerm}) {
+             |        $resultTerm = false;
+             |        break;
+             |      }
+             |    } else {
+             |      $resultTerm = false;
+             |      break;
+             |    }
+             |  }
+             |} else {
+             |  $resultTerm = false;
+             |}
+             """.stripMargin
         (stmt, resultTerm)
     }
   
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
index 1526744..890ef80 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.table.generated.RecordComparator;
 import org.apache.flink.table.plan.util.SortUtil;
 import org.apache.flink.table.runtime.sort.BinaryInMemorySortBuffer;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.InternalSerializers;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BooleanType;
@@ -181,7 +182,8 @@ public class SortCodeGeneratorTest {
 			if (value == null) {
 				writer.setNullAt(j);
 			} else {
-				BinaryWriter.write(writer, j, value, types[fields[j]]);
+				BinaryWriter.write(writer, j, value, types[fields[j]],
+						InternalSerializers.create(types[fields[j]], new ExecutionConfig()));
 			}
 		}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/runtime/utils/RangeInputFormat.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/runtime/utils/RangeInputFormat.java
new file mode 100644
index 0000000..c642c85
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/runtime/utils/RangeInputFormat.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.BoxedWrapperRow;
+
+import java.io.IOException;
+
+/**
+ * An input format that returns objects from a range.
+ */
+public class RangeInputFormat extends GenericInputFormat<BaseRow> implements NonParallelInput {
+
+	private static final long serialVersionUID = 1L;
+
+	private long start;
+	private long end;
+
+	private transient long current;
+	private transient BoxedWrapperRow reuse;
+
+	public RangeInputFormat(long start, long end) {
+		this.start = start;
+		this.end = end;
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return current >= end;
+	}
+
+	@Override
+	public void open(GenericInputSplit split) throws IOException {
+		super.open(split);
+		this.current = start;
+	}
+
+	@Override
+	public BaseRow nextRecord(BaseRow ignore) throws IOException {
+		if (reuse == null) {
+			reuse = new BoxedWrapperRow(1);
+		}
+		reuse.setLong(0, current);
+		current++;
+		return reuse;
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowTestUtil.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowTestUtil.java
index 445e124..3225243 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowTestUtil.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowTestUtil.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.util;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.BinaryWriter;
 import org.apache.flink.table.dataformat.GenericRow;
 import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.types.InternalSerializers;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.typeutils.BaseRowTypeInfo;
@@ -99,7 +101,8 @@ public class BaseRowTestUtil {
 	}
 
 	public static void write(BinaryWriter writer, int pos, Object o, LogicalType type) {
-		BinaryWriter.write(writer, pos, o, type);
+		BinaryWriter.write(writer, pos, o, type,
+				InternalSerializers.create(type, new ExecutionConfig()));
 	}
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala
index a0e7eec..54b5e0d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/SortAggITCase.scala
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.runtime.batch.sql.agg
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.{TableConfigOptions, Types}
 import org.apache.flink.table.functions.AggregateFunction
@@ -27,8 +27,10 @@ import org.apache.flink.table.plan.util.JavaUserDefinedAggFunctions.WeightedAvgW
 import org.apache.flink.table.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.runtime.utils.UserDefinedFunctionTestUtils.{MyPojo, MyToPojoFunc}
 import org.apache.flink.table.util.{CountAccumulator, CountAggFunction, IntSumAggFunction}
+
 import org.junit.{Ignore, Test}
 
+import java.lang
 import java.lang.{Iterable => JIterable}
 
 import scala.annotation.varargs
@@ -47,6 +49,26 @@ class SortAggITCase
     registerFunction("countFun", new CountAggFunction())
     registerFunction("intSumFun", new IntSumAggFunction())
     registerFunction("weightedAvg", new WeightedAvgWithMergeAndReset())
+
+    registerFunction("myPrimitiveArrayUdaf", new MyPrimitiveArrayUdaf())
+    registerFunction("myObjectArrayUdaf", new MyObjectArrayUdaf())
+    registerFunction("myNestedLongArrayUdaf", new MyNestedLongArrayUdaf())
+    registerFunction("myNestedStringArrayUdaf", new MyNestedStringArrayUdaf())
+
+    registerFunction("myPrimitiveMapUdaf", new MyPrimitiveMapUdaf())
+    registerFunction("myObjectMapUdaf", new MyObjectMapUdaf())
+    registerFunction("myNestedMapUdaf", new MyNestedMapUdf())
+  }
+
+  @Test
+  def testBigDataSimpleArrayUDAF(): Unit = {
+    tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1)
+    registerFunction("simplePrimitiveArrayUdaf", new SimplePrimitiveArrayUdaf())
+    registerRange("RangeT", 1000000)
+    env.setParallelism(1)
+    checkResult(
+      "SELECT simplePrimitiveArrayUdaf(id) FROM RangeT",
+      Seq(row(499999500000L)))
   }
 
   @Ignore
@@ -241,6 +263,60 @@ class SortAggITCase
       )
     )
   }
+
+  @Test
+  def testArrayUdaf(): Unit = {
+    tEnv.getConfig.getConf.setInteger(TableConfigOptions.SQL_RESOURCE_DEFAULT_PARALLELISM, 1)
+    env.setParallelism(1)
+    checkResult(
+      "SELECT myPrimitiveArrayUdaf(a, b) FROM Table3",
+      Seq(row(Array(231, 91)))
+    )
+    checkResult(
+      "SELECT myObjectArrayUdaf(c) FROM Table3",
+      Seq(row(Array("HHHHILCCCCCCCCCCCCCCC", "iod?.r123456789012345")))
+    )
+    checkResult(
+      "SELECT myNestedLongArrayUdaf(a, b)[2] FROM Table3",
+      Seq(row(Array(91, 231)))
+    )
+    checkResult(
+      "SELECT myNestedStringArrayUdaf(c)[2] FROM Table3",
+      Seq(row(Array("iod?.r123456789012345", "HHHHILCCCCCCCCCCCCCCC")))
+    )
+  }
+
+  @Test
+  def testMapUdaf(): Unit = {
+    checkResult(
+      "SELECT myPrimitiveMapUdaf(a, b)[3] FROM Table3",
+      Seq(row(15))
+    )
+    checkResult(
+      "SELECT myPrimitiveMapUdaf(a, b)[6] FROM Table3",
+      Seq(row(111))
+    )
+    checkResult(
+      "SELECT myObjectMapUdaf(a, c)['Co'] FROM Table3",
+      Seq(row(210))
+    )
+    checkResult(
+      "SELECT myObjectMapUdaf(a, c)['He'] FROM Table3",
+      Seq(row(9))
+    )
+    checkResult(
+      "SELECT myNestedMapUdaf(a, b, c)[6]['Co'] FROM Table3",
+      Seq(row(111))
+    )
+    checkResult(
+      "SELECT myNestedMapUdaf(a, b, c)[3]['He'] FROM Table3",
+      Seq(row(4))
+    )
+    checkResult(
+      "SELECT myNestedMapUdaf(a, b, c)[3]['Co'] FROM Table3",
+      Seq(row("null"))
+    )
+  }
 }
 
 class MyPojoAggFunction extends AggregateFunction[MyPojo, CountAccumulator] {
@@ -320,3 +396,172 @@ class VarArgsAggFunction extends AggregateFunction[Long, CountAccumulator] {
     new TupleTypeInfo[CountAccumulator](classOf[CountAccumulator], Types.LONG)
   }
 }
+
+class SimplePrimitiveArrayUdaf extends AggregateFunction[lang.Long, Array[Long]] {
+
+  var i = 0
+
+  override def createAccumulator(): Array[Long] = new Array[Long](10000)
+
+  override def getValue(accumulator: Array[Long]): lang.Long = Long.box(accumulator.sum)
+
+  def accumulate(accumulator: Array[Long], a: Long): Unit = {
+    accumulator(i) += a
+    i += 1
+    if (i >= accumulator.length) {
+      i = 0
+    }
+  }
+
+  override def getAccumulatorType: TypeInformation[Array[Long]] =
+    PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+
+  override def getResultType: TypeInformation[lang.Long] = Types.LONG
+}
+
+class MyPrimitiveArrayUdaf extends AggregateFunction[Array[Long], Array[Long]] {
+
+  override def createAccumulator(): Array[Long] = new Array[Long](2)
+
+  override def getValue(accumulator: Array[Long]): Array[Long] = accumulator
+
+  def accumulate(accumulator: Array[Long], a: Int, b: Long): Unit = {
+    accumulator(0) += a
+    accumulator(1) += b
+  }
+
+  override def getAccumulatorType: TypeInformation[Array[Long]] =
+    PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+
+  override def getResultType: TypeInformation[Array[Long]] =
+    PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+}
+
+class MyObjectArrayUdaf extends AggregateFunction[Array[String], Array[String]] {
+
+  override def createAccumulator(): Array[String] = Array("", "")
+
+  override def getValue(accumulator: Array[String]): Array[String] = accumulator
+
+  def accumulate(accumulator: Array[String], c: String): Unit = {
+    accumulator(0) = accumulator(0) + c.charAt(0)
+    accumulator(1) = accumulator(1) + c.charAt(c.length - 1)
+  }
+
+  override def getAccumulatorType: TypeInformation[Array[String]] =
+    BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
+
+  override def getResultType: TypeInformation[Array[String]] =
+    BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO
+}
+
+class MyNestedLongArrayUdaf extends AggregateFunction[Array[Array[Long]], Array[Array[Long]]] {
+
+  override def createAccumulator(): Array[Array[Long]] = Array(Array(0, 0), Array(0, 0))
+
+  override def getValue(accumulator: Array[Array[Long]]): Array[Array[Long]] = accumulator
+
+  def accumulate(accumulator: Array[Array[Long]], a: Int, b: Long): Unit = {
+    accumulator(0)(0) += a
+    accumulator(0)(1) += b
+    accumulator(1)(0) += b
+    accumulator(1)(1) += a
+  }
+
+  override def getAccumulatorType =
+    ObjectArrayTypeInfo.getInfoFor(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)
+
+  override def getResultType = getAccumulatorType
+}
+
+class MyNestedStringArrayUdaf extends AggregateFunction[
+    Array[Array[String]], Array[Array[String]]] {
+
+  override def createAccumulator(): Array[Array[String]] = Array(Array("", ""), Array("", ""))
+
+  override def getValue(accumulator: Array[Array[String]]): Array[Array[String]] = accumulator
+
+  def accumulate(accumulator: Array[Array[String]], c: String): Unit = {
+    accumulator(0)(0) = accumulator(0)(0) + c.charAt(0)
+    accumulator(0)(1) = accumulator(0)(1) + c.charAt(c.length - 1)
+    accumulator(1)(0) = accumulator(1)(0) + c.charAt(c.length - 1)
+    accumulator(1)(1) = accumulator(1)(1) + c.charAt(0)
+  }
+
+  override def getAccumulatorType =
+    ObjectArrayTypeInfo.getInfoFor(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)
+
+  override def getResultType =
+    getAccumulatorType
+}
+
+class MyPrimitiveMapUdaf extends AggregateFunction[
+    java.util.Map[Long, Int], java.util.Map[Long, Int]] {
+
+  override def createAccumulator(): java.util.Map[Long, Int] =
+    new java.util.HashMap[Long, Int]()
+
+  override def getValue(accumulator: java.util.Map[Long, Int]): java.util.Map[Long, Int] =
+    accumulator
+
+  def accumulate(accumulator: java.util.Map[Long, Int], a: Int, b: Long): Unit = {
+    accumulator.putIfAbsent(b, 0)
+    accumulator.put(b, accumulator.get(b) + a)
+  }
+
+  override def getAccumulatorType =
+    new MapTypeInfo(Types.LONG, Types.INT)
+        .asInstanceOf[TypeInformation[java.util.Map[Long, Int]]]
+
+  override def getResultType =
+    getAccumulatorType
+}
+
+class MyObjectMapUdaf extends AggregateFunction[
+    java.util.Map[String, Int], java.util.Map[String, Int]] {
+
+  override def createAccumulator(): java.util.Map[String, Int] =
+    new java.util.HashMap[String, Int]()
+
+  override def getValue(accumulator: java.util.Map[String, Int]): java.util.Map[String, Int] =
+    accumulator
+
+  def accumulate(accumulator: java.util.Map[String, Int], a: Int, c: String): Unit = {
+    val key = c.substring(0, 2)
+    accumulator.putIfAbsent(key, 0)
+    accumulator.put(key, accumulator.get(key) + a)
+  }
+
+  override def getAccumulatorType =
+    new MapTypeInfo(Types.STRING, Types.INT)
+        .asInstanceOf[TypeInformation[java.util.Map[String, Int]]]
+
+  override def getResultType = getAccumulatorType
+}
+
+class MyNestedMapUdf extends AggregateFunction[
+    java.util.Map[Long, java.util.Map[String, Int]],
+    java.util.Map[Long, java.util.Map[String, Int]]] {
+
+  override def createAccumulator(): java.util.Map[Long, java.util.Map[String, Int]] =
+    new java.util.HashMap[Long, java.util.Map[String, Int]]()
+
+  override def getValue(accumulator: java.util.Map[Long, java.util.Map[String, Int]])
+  : java.util.Map[Long, java.util.Map[String, Int]] =
+    accumulator
+
+  def accumulate(
+      accumulator: java.util.Map[Long, java.util.Map[String, Int]],
+      a: Int, b: Long, c: String): Unit = {
+    val key = c.substring(0, 2)
+    accumulator.putIfAbsent(b, new java.util.HashMap[String, Int]())
+    accumulator.get(b).putIfAbsent(key, 0)
+    accumulator.get(b).put(key, accumulator.get(b).get(key) + a)
+  }
+
+  override def getAccumulatorType =
+    new MapTypeInfo(Types.LONG, new MapTypeInfo(Types.STRING, Types.INT))
+        .asInstanceOf[TypeInformation[java.util.Map[Long, java.util.Map[String, Int]]]]
+
+  override def getResultType = getAccumulatorType
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTableEnvUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTableEnvUtil.scala
index fb1c2c6..da88b70 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTableEnvUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTableEnvUtil.scala
@@ -145,7 +145,7 @@ object BatchTableEnvUtil {
     * @param boundedStream The [[DataStream]] to register as table in the catalog.
     * @tparam T the type of the [[DataStream]].
     */
-  protected def registerBoundedStreamInternal[T](
+  def registerBoundedStreamInternal[T](
       tEnv: BatchTableEnvironment,
       name: String,
       boundedStream: DataStream[T],
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
index f97f5e6..88e63ea 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -22,17 +22,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaExecEnv}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment => ScalaExecEnv}
 import org.apache.flink.table.api.internal.TableImpl
 import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
 import org.apache.flink.table.api.{SqlParserException, Table, TableConfig, TableConfigOptions, TableEnvironment}
-import org.apache.flink.table.dataformat.{BinaryRow, BinaryRowWriter}
+import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, BinaryRowWriter}
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.plan.stats.FlinkStatistic
 import org.apache.flink.table.plan.util.FlinkRelOptUtil
 import org.apache.flink.table.runtime.utils.BatchAbstractTestBase.DEFAULT_PARALLELISM
-import org.apache.flink.table.types.logical.LogicalType
+import org.apache.flink.table.types.logical.{BigIntType, LogicalType}
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
 import org.apache.flink.table.util.{BaseRowTestUtil, DiffRepository, TableTestUtil}
 import org.apache.flink.types.Row
 
@@ -431,6 +432,23 @@ class BatchTestBase extends BatchAbstractTestBase {
       f: AggregateFunction[T, ACC]): Unit = {
     tEnv.registerFunction(name, f)
   }
+
+  def registerRange(name: String, end: Long): Unit = {
+    registerRange(name, 0, end)
+  }
+
+  def registerRange(name: String, start: Long, end: Long): Unit = {
+    BatchTableEnvUtil.registerBoundedStreamInternal(
+      tEnv, name, newRangeSource(start, end).javaStream,
+      Some[Array[String]](Array[String]("id")), None, None)
+  }
+
+  def newRangeSource(start: Long, end: Long): DataStream[BaseRow] = {
+    implicit val typeInfo: TypeInformation[BaseRow] = new BaseRowTypeInfo(new BigIntType)
+    val boundedStream = env.createInput(new RangeInputFormat(start, end))
+    boundedStream.setParallelism(1)
+    boundedStream
+  }
 }
 
 object BatchTestBase {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
index 9c66826..d45da77 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
@@ -20,7 +20,8 @@ package org.apache.flink.table.dataformat;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
 import org.apache.flink.table.typeutils.BaseRowSerializer;
 import org.apache.flink.table.util.SegmentsUtil;
 
@@ -97,13 +98,15 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
 	}
 
 	@Override
-	public void writeArray(int pos, BinaryArray input) {
-		writeSegmentsToVarLenPart(pos, input.getSegments(), input.getOffset(), input.getSizeInBytes());
+	public void writeArray(int pos, BaseArray input, BaseArraySerializer serializer) {
+		BinaryArray binary = serializer.toBinaryArray(input);
+		writeSegmentsToVarLenPart(pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
 	}
 
 	@Override
-	public void writeMap(int pos, BinaryMap input) {
-		writeSegmentsToVarLenPart(pos, input.getSegments(), input.getOffset(), input.getSizeInBytes());
+	public void writeMap(int pos, BaseMap input, BaseMapSerializer serializer) {
+		BinaryMap binary = serializer.toBinaryMap(input);
+		writeSegmentsToVarLenPart(pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
 	}
 
 	private DataOutputViewStreamWrapper getOutputView() {
@@ -135,12 +138,12 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
 	}
 
 	@Override
-	public void writeRow(int pos, BaseRow input, RowType type) {
+	public void writeRow(int pos, BaseRow input, BaseRowSerializer serializer) {
 		if (input instanceof BinaryFormat) {
 			BinaryFormat row = (BinaryFormat) input;
 			writeSegmentsToVarLenPart(pos, row.getSegments(), row.getOffset(), row.getSizeInBytes());
 		} else {
-			BinaryRow row = BaseRowSerializer.baseRowToBinary(input, type);
+			BinaryRow row = serializer.toBinaryRow(input);
 			writeSegmentsToVarLenPart(pos, row.getSegments(), row.getOffset(), row.getSizeInBytes());
 		}
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseArray.java
new file mode 100644
index 0000000..75a4c1c
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseArray.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * An interface for array used internally in Flink Table/SQL.
+ *
+ * <p>There are different implementations depending on the scenario:
+ * After serialization, it becomes the {@link BinaryArray} format.
+ * Convenient updates use the {@link GenericArray} format.
+ */
+public interface BaseArray extends TypeGetterSetters {
+
+	int numElements();
+
+	boolean isNullAt(int pos);
+
+	void setNullAt(int pos);
+
+	void setNotNullAt(int pos);
+
+	void setNullLong(int pos);
+
+	void setNullInt(int pos);
+
+	void setNullBoolean(int pos);
+
+	void setNullByte(int pos);
+
+	void setNullShort(int pos);
+
+	void setNullFloat(int pos);
+
+	void setNullDouble(int pos);
+
+	boolean[] toBooleanArray();
+
+	byte[] toByteArray();
+
+	short[] toShortArray();
+
+	int[] toIntArray();
+
+	long[] toLongArray();
+
+	float[] toFloatArray();
+
+	double[] toDoubleArray();
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseMap.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseMap.java
new file mode 100644
index 0000000..3ad542c
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BaseMap.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Map;
+
+/**
+ * An interface for map used internally in Flink Table/SQL.
+ *
+ * <p>There are different implementations depending on the scenario:
+ * After serialization, it becomes the {@link BinaryMap} format.
+ * Convenient updates use the {@link GenericMap} format.
+ */
+public interface BaseMap {
+
+	/**
+	 * Invoke by codeGens.
+	 */
+	int numElements();
+
+	/**
+	 * This method will return a Java map containing INTERNAL type data.
+	 * If you want a Java map containing external type data, you have to use converters.
+	 */
+	Map toJavaMap(LogicalType keyType, LogicalType valueType);
+
+	// NOTE:
+	//
+	// As binary map has specific `get` and `toString` method,
+	// we do not provide these methods in the interface.
+	// Instead, we implement them in codegen.
+	//
+	// `get` is implemented in ScalarOperatorGens -> generateMapGet()
+	// `toString` is implemented in ScalarOperatorGens -> generateCast()
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
index fbf81b1..fbda19c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
@@ -23,6 +23,8 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.util.SegmentsUtil;
 
+import java.lang.reflect.Array;
+
 import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
 
 /**
@@ -33,7 +35,7 @@ import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
  *
  * <p>{@code BinaryArray} are influenced by Apache Spark UnsafeArrayData.
  */
-public final class BinaryArray extends BinaryFormat implements TypeGetterSetters {
+public final class BinaryArray extends BinaryFormat implements BaseArray {
 
 	/**
 	 * Offset for Arrays.
@@ -91,6 +93,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		return elementOffset + ordinal * elementSize;
 	}
 
+	@Override
 	public int numElements() {
 		return numElements;
 	}
@@ -120,6 +123,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		SegmentsUtil.bitSet(segments, offset + 4, pos);
 	}
 
+	@Override
 	public void setNotNullAt(int pos) {
 		assertIndexIsValid(pos);
 		SegmentsUtil.bitUnSet(segments, offset + 4, pos);
@@ -138,6 +142,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		SegmentsUtil.setLong(segments, getElementOffset(pos, 8), value);
 	}
 
+	@Override
 	public void setNullLong(int pos) {
 		assertIndexIsValid(pos);
 		SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -157,6 +162,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		SegmentsUtil.setInt(segments, getElementOffset(pos, 4), value);
 	}
 
+	@Override
 	public void setNullInt(int pos) {
 		assertIndexIsValid(pos);
 		SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -204,13 +210,13 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 	}
 
 	@Override
-	public BinaryArray getArray(int pos) {
+	public BaseArray getArray(int pos) {
 		assertIndexIsValid(pos);
 		return BinaryArray.readBinaryArrayFieldFromSegments(segments, offset, getLong(pos));
 	}
 
 	@Override
-	public BinaryMap getMap(int pos) {
+	public BaseMap getMap(int pos) {
 		assertIndexIsValid(pos);
 		return BinaryMap.readBinaryMapFieldFromSegments(segments, offset, getLong(pos));
 	}
@@ -237,6 +243,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		SegmentsUtil.setBoolean(segments, getElementOffset(pos, 1), value);
 	}
 
+	@Override
 	public void setNullBoolean(int pos) {
 		assertIndexIsValid(pos);
 		SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -256,6 +263,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		SegmentsUtil.setByte(segments, getElementOffset(pos, 1), value);
 	}
 
+	@Override
 	public void setNullByte(int pos) {
 		assertIndexIsValid(pos);
 		SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -275,6 +283,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		SegmentsUtil.setShort(segments, getElementOffset(pos, 2), value);
 	}
 
+	@Override
 	public void setNullShort(int pos) {
 		assertIndexIsValid(pos);
 		SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -294,6 +303,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		SegmentsUtil.setFloat(segments, getElementOffset(pos, 4), value);
 	}
 
+	@Override
 	public void setNullFloat(int pos) {
 		assertIndexIsValid(pos);
 		SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -313,6 +323,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		SegmentsUtil.setDouble(segments, getElementOffset(pos, 8), value);
 	}
 
+	@Override
 	public void setNullDouble(int pos) {
 		assertIndexIsValid(pos);
 		SegmentsUtil.bitSet(segments, offset + 4, pos);
@@ -365,6 +376,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		}
 	}
 
+	@Override
 	public boolean[] toBooleanArray() {
 		checkNoNull();
 		boolean[] values = new boolean[numElements];
@@ -373,6 +385,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		return values;
 	}
 
+	@Override
 	public byte[] toByteArray() {
 		checkNoNull();
 		byte[] values = new byte[numElements];
@@ -381,6 +394,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		return values;
 	}
 
+	@Override
 	public short[] toShortArray() {
 		checkNoNull();
 		short[] values = new short[numElements];
@@ -389,6 +403,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		return values;
 	}
 
+	@Override
 	public int[] toIntArray() {
 		checkNoNull();
 		int[] values = new int[numElements];
@@ -397,6 +412,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		return values;
 	}
 
+	@Override
 	public long[] toLongArray() {
 		checkNoNull();
 		long[] values = new long[numElements];
@@ -405,6 +421,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		return values;
 	}
 
+	@Override
 	public float[] toFloatArray() {
 		checkNoNull();
 		float[] values = new float[numElements];
@@ -413,6 +430,7 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		return values;
 	}
 
+	@Override
 	public double[] toDoubleArray() {
 		checkNoNull();
 		double[] values = new double[numElements];
@@ -421,6 +439,17 @@ public final class BinaryArray extends BinaryFormat implements TypeGetterSetters
 		return values;
 	}
 
+	public <T> T[] toClassArray(LogicalType elementType, Class<T> elementClass) {
+		int size = numElements();
+		T[] values = (T[]) Array.newInstance(elementClass, size);
+		for (int i = 0; i < size; i++) {
+			if (!isNullAt(i)) {
+				values[i] = (T) TypeGetterSetters.get(this, i, elementType);
+			}
+		}
+		return values;
+	}
+
 	public BinaryArray copy() {
 		return copy(new BinaryArray());
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
index 590a422..ee0529a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArrayWriter.java
@@ -208,4 +208,8 @@ public final class BinaryArrayWriter extends AbstractBinaryWriter {
 	public void complete() {
 		array.pointTo(segment, 0, cursor);
 	}
+
+	public int getNumElements() {
+		return numElements;
+	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
index 272fd27..8da32db 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
@@ -20,8 +20,13 @@ package org.apache.flink.table.dataformat;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.util.SegmentsUtil;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.types.ClassLogicalTypeConverter.getInternalClassForType;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -29,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  *
  * <p>{@code BinaryMap} are influenced by Apache Spark UnsafeMapData.
  */
-public final class BinaryMap extends BinaryFormat {
+public final class BinaryMap extends BinaryFormat implements BaseMap {
 
 	private final BinaryArray keys;
 	private final BinaryArray values;
@@ -69,6 +74,18 @@ public final class BinaryMap extends BinaryFormat {
 		return values;
 	}
 
+	@Override
+	public Map<Object, Object> toJavaMap(LogicalType keyType, LogicalType valueType) {
+		Object[] keyArray = keys.toClassArray(keyType, getInternalClassForType(keyType));
+		Object[] valueArray = values.toClassArray(valueType, getInternalClassForType(valueType));
+
+		Map<Object, Object> map = new HashMap<>();
+		for (int i = 0; i < keyArray.length; i++) {
+			map.put(keyArray[i], valueArray[i]);
+		}
+		return map;
+	}
+
 	public BinaryMap copy() {
 		return copy(new BinaryMap());
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
index 358b1fc..5d67d2f 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
@@ -318,13 +318,13 @@ public final class BinaryRow extends BinaryFormat implements BaseRow {
 	}
 
 	@Override
-	public BinaryArray getArray(int pos) {
+	public BaseArray getArray(int pos) {
 		assertIndexIsValid(pos);
 		return BinaryArray.readBinaryArrayFieldFromSegments(segments, offset, getLong(pos));
 	}
 
 	@Override
-	public BinaryMap getMap(int pos) {
+	public BaseMap getMap(int pos) {
 		assertIndexIsValid(pos);
 		return BinaryMap.readBinaryMapFieldFromSegments(segments, offset, getLong(pos));
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
index 16fef15..ea97673 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
@@ -17,9 +17,12 @@
 
 package org.apache.flink.table.dataformat;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
 
 /**
  * Writer to write a composite data format, like row, array.
@@ -59,11 +62,11 @@ public interface BinaryWriter {
 
 	void writeDecimal(int pos, Decimal value, int precision);
 
-	void writeArray(int pos, BinaryArray value);
+	void writeArray(int pos, BaseArray value, BaseArraySerializer serializer);
 
-	void writeMap(int pos, BinaryMap value);
+	void writeMap(int pos, BaseMap value, BaseMapSerializer serializer);
 
-	void writeRow(int pos, BaseRow value, RowType type);
+	void writeRow(int pos, BaseRow value, BaseRowSerializer type);
 
 	void writeGeneric(int pos, BinaryGeneric value);
 
@@ -72,7 +75,8 @@ public interface BinaryWriter {
 	 */
 	void complete();
 
-	static void write(BinaryWriter writer, int pos, Object o, LogicalType type) {
+	static void write(BinaryWriter writer, int pos,
+			Object o, LogicalType type, TypeSerializer serializer) {
 		switch (type.getTypeRoot()) {
 			case BOOLEAN:
 				writer.writeBoolean(pos, (boolean) o);
@@ -109,15 +113,14 @@ public interface BinaryWriter {
 				writer.writeDecimal(pos, (Decimal) o, decimalType.getPrecision());
 				break;
 			case ARRAY:
-				writer.writeArray(pos, (BinaryArray) o);
+				writer.writeArray(pos, (BaseArray) o, (BaseArraySerializer) serializer);
 				break;
 			case MAP:
 			case MULTISET:
-				writer.writeMap(pos, (BinaryMap) o);
+				writer.writeMap(pos, (BaseMap) o, (BaseMapSerializer) serializer);
 				break;
 			case ROW:
-				RowType rowType = (RowType) type;
-				writer.writeRow(pos, (BaseRow) o, rowType);
+				writer.writeRow(pos, (BaseRow) o, (BaseRowSerializer) serializer);
 				break;
 			case ANY:
 				writer.writeGeneric(pos, (BinaryGeneric) o);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
index 8764c98..3fed637 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
@@ -140,13 +140,13 @@ public final class ColumnarRow implements BaseRow {
 	}
 
 	@Override
-	public BinaryArray getArray(int ordinal) {
+	public BaseArray getArray(int ordinal) {
 		// TODO
 		throw new UnsupportedOperationException("Array is not supported.");
 	}
 
 	@Override
-	public BinaryMap getMap(int ordinal) {
+	public BaseMap getMap(int ordinal) {
 		// TODO
 		throw new UnsupportedOperationException("Map is not supported.");
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index 35f9b70..af32a51 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
 import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.InternalSerializers;
 import org.apache.flink.table.types.KeyValueDataType;
 import org.apache.flink.table.types.LogicalTypeDataTypeConverter;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -44,6 +45,8 @@ import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
 import org.apache.flink.table.typeutils.DecimalTypeInfo;
 import org.apache.flink.types.Row;
 
+import org.apache.commons.lang3.ArrayUtils;
+
 import java.io.Serializable;
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
@@ -233,7 +236,7 @@ public class DataFormatConverters {
 				}
 				return new GenericConverter(typeInfo.createSerializer(new ExecutionConfig()));
 			default:
-				throw new RuntimeException("Not support dataType: " + originDataType);
+				throw new RuntimeException("Not support dataType: " + dataType);
 		}
 	}
 
@@ -466,7 +469,7 @@ public class DataFormatConverters {
 	/**
 	 * Converter for BinaryArray.
 	 */
-	public static final class BinaryArrayConverter extends IdentityConverter<BinaryArray> {
+	public static final class BinaryArrayConverter extends IdentityConverter<BaseArray> {
 
 		private static final long serialVersionUID = -7790350668043604641L;
 
@@ -475,7 +478,7 @@ public class DataFormatConverters {
 		private BinaryArrayConverter() {}
 
 		@Override
-		BinaryArray toExternalImpl(BaseRow row, int column) {
+		BaseArray toExternalImpl(BaseRow row, int column) {
 			return row.getArray(column);
 		}
 	}
@@ -483,7 +486,7 @@ public class DataFormatConverters {
 	/**
 	 * Converter for BinaryMap.
 	 */
-	public static final class BinaryMapConverter extends IdentityConverter<BinaryMap> {
+	public static final class BinaryMapConverter extends IdentityConverter<BaseMap> {
 
 		private static final long serialVersionUID = -9114231688474126815L;
 
@@ -492,7 +495,7 @@ public class DataFormatConverters {
 		private BinaryMapConverter() {}
 
 		@Override
-		BinaryMap toExternalImpl(BaseRow row, int column) {
+		BaseMap toExternalImpl(BaseRow row, int column) {
 			return row.getMap(column);
 		}
 	}
@@ -706,7 +709,7 @@ public class DataFormatConverters {
 	/**
 	 * Converter for primitive int array.
 	 */
-	public static final class PrimitiveIntArrayConverter extends DataFormatConverter<BinaryArray, int[]> {
+	public static final class PrimitiveIntArrayConverter extends DataFormatConverter<BaseArray, int[]> {
 
 		private static final long serialVersionUID = 1780941126232395638L;
 
@@ -715,12 +718,12 @@ public class DataFormatConverters {
 		private PrimitiveIntArrayConverter() {}
 
 		@Override
-		BinaryArray toInternalImpl(int[] value) {
-			return BinaryArray.fromPrimitiveArray(value);
+		BaseArray toInternalImpl(int[] value) {
+			return new GenericArray(value, value.length, true);
 		}
 
 		@Override
-		int[] toExternalImpl(BinaryArray value) {
+		int[] toExternalImpl(BaseArray value) {
 			return value.toIntArray();
 		}
 
@@ -733,7 +736,7 @@ public class DataFormatConverters {
 	/**
 	 * Converter for primitive boolean array.
 	 */
-	public static final class PrimitiveBooleanArrayConverter extends DataFormatConverter<BinaryArray, boolean[]> {
+	public static final class PrimitiveBooleanArrayConverter extends DataFormatConverter<BaseArray, boolean[]> {
 
 		private static final long serialVersionUID = -4037693692440282141L;
 
@@ -742,12 +745,12 @@ public class DataFormatConverters {
 		private PrimitiveBooleanArrayConverter() {}
 
 		@Override
-		BinaryArray toInternalImpl(boolean[] value) {
-			return BinaryArray.fromPrimitiveArray(value);
+		BaseArray toInternalImpl(boolean[] value) {
+			return new GenericArray(value, value.length, true);
 		}
 
 		@Override
-		boolean[] toExternalImpl(BinaryArray value) {
+		boolean[] toExternalImpl(BaseArray value) {
 			return value.toBooleanArray();
 		}
 
@@ -777,7 +780,7 @@ public class DataFormatConverters {
 	/**
 	 * Converter for primitive short array.
 	 */
-	public static final class PrimitiveShortArrayConverter extends DataFormatConverter<BinaryArray, short[]> {
+	public static final class PrimitiveShortArrayConverter extends DataFormatConverter<BaseArray, short[]> {
 
 		private static final long serialVersionUID = -1343184089311186834L;
 
@@ -786,12 +789,12 @@ public class DataFormatConverters {
 		private PrimitiveShortArrayConverter() {}
 
 		@Override
-		BinaryArray toInternalImpl(short[] value) {
-			return BinaryArray.fromPrimitiveArray(value);
+		BaseArray toInternalImpl(short[] value) {
+			return new GenericArray(value, value.length, true);
 		}
 
 		@Override
-		short[] toExternalImpl(BinaryArray value) {
+		short[] toExternalImpl(BaseArray value) {
 			return value.toShortArray();
 		}
 
@@ -804,7 +807,7 @@ public class DataFormatConverters {
 	/**
 	 * Converter for primitive long array.
 	 */
-	public static final class PrimitiveLongArrayConverter extends DataFormatConverter<BinaryArray, long[]> {
+	public static final class PrimitiveLongArrayConverter extends DataFormatConverter<BaseArray, long[]> {
 
 		private static final long serialVersionUID = 4061982985342526078L;
 
@@ -813,12 +816,12 @@ public class DataFormatConverters {
 		private PrimitiveLongArrayConverter() {}
 
 		@Override
-		BinaryArray toInternalImpl(long[] value) {
-			return BinaryArray.fromPrimitiveArray(value);
+		BaseArray toInternalImpl(long[] value) {
+			return new GenericArray(value, value.length, true);
 		}
 
 		@Override
-		long[] toExternalImpl(BinaryArray value) {
+		long[] toExternalImpl(BaseArray value) {
 			return value.toLongArray();
 		}
 
@@ -831,7 +834,7 @@ public class DataFormatConverters {
 	/**
 	 * Converter for primitive float array.
 	 */
-	public static final class PrimitiveFloatArrayConverter extends DataFormatConverter<BinaryArray, float[]> {
+	public static final class PrimitiveFloatArrayConverter extends DataFormatConverter<BaseArray, float[]> {
 
 		private static final long serialVersionUID = -3237695040861141459L;
 
@@ -840,12 +843,12 @@ public class DataFormatConverters {
 		private PrimitiveFloatArrayConverter() {}
 
 		@Override
-		BinaryArray toInternalImpl(float[] value) {
-			return BinaryArray.fromPrimitiveArray(value);
+		BaseArray toInternalImpl(float[] value) {
+			return new GenericArray(value, value.length, true);
 		}
 
 		@Override
-		float[] toExternalImpl(BinaryArray value) {
+		float[] toExternalImpl(BaseArray value) {
 			return value.toFloatArray();
 		}
 
@@ -858,7 +861,7 @@ public class DataFormatConverters {
 	/**
 	 * Converter for primitive double array.
 	 */
-	public static final class PrimitiveDoubleArrayConverter extends DataFormatConverter<BinaryArray, double[]> {
+	public static final class PrimitiveDoubleArrayConverter extends DataFormatConverter<BaseArray, double[]> {
 
 		private static final long serialVersionUID = 6333670535356315691L;
 
@@ -867,12 +870,12 @@ public class DataFormatConverters {
 		private PrimitiveDoubleArrayConverter() {}
 
 		@Override
-		BinaryArray toInternalImpl(double[] value) {
-			return BinaryArray.fromPrimitiveArray(value);
+		BaseArray toInternalImpl(double[] value) {
+			return new GenericArray(value, value.length, true);
 		}
 
 		@Override
-		double[] toExternalImpl(BinaryArray value) {
+		double[] toExternalImpl(BaseArray value) {
 			return value.toDoubleArray();
 		}
 
@@ -885,7 +888,7 @@ public class DataFormatConverters {
 	/**
 	 * Converter for object array.
 	 */
-	public static final class ObjectArrayConverter<T> extends DataFormatConverter<BinaryArray, T[]> {
+	public static final class ObjectArrayConverter<T> extends DataFormatConverter<BaseArray, T[]> {
 
 		private static final long serialVersionUID = -7434682160639380078L;
 
@@ -893,33 +896,52 @@ public class DataFormatConverters {
 		private final LogicalType elementType;
 		private final DataFormatConverter<Object, T> elementConverter;
 		private final int elementSize;
+		private final TypeSerializer<T> eleSer;
+		private final boolean isEleIndentity;
+
+		private transient BinaryArray reuseArray;
+		private transient BinaryArrayWriter reuseWriter;
 
 		public ObjectArrayConverter(DataType elementType) {
 			this.componentClass = (Class) elementType.getConversionClass();
 			this.elementType = LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(elementType);
 			this.elementConverter = DataFormatConverters.getConverterForDataType(elementType);
-			this.elementSize = BinaryArray.calculateFixLengthPartSize(elementType.getLogicalType());
+			this.elementSize = BinaryArray.calculateFixLengthPartSize(this.elementType);
+			this.eleSer = InternalSerializers.create(this.elementType, new ExecutionConfig());
+			this.isEleIndentity = elementConverter instanceof IdentityConverter;
 		}
 
 		@Override
-		BinaryArray toInternalImpl(T[] value) {
-			BinaryArray array = new BinaryArray();
-			BinaryArrayWriter writer = new BinaryArrayWriter(array, value.length, elementSize);
+		BaseArray toInternalImpl(T[] value) {
+			return isEleIndentity ? new GenericArray(value, value.length) : toBinaryArray(value);
+		}
+
+		private BaseArray toBinaryArray(T[] value) {
+			if (reuseArray == null) {
+				reuseArray = new BinaryArray();
+			}
+			if (reuseWriter == null || reuseWriter.getNumElements() != value.length) {
+				reuseWriter = new BinaryArrayWriter(reuseArray, value.length, elementSize);
+			} else {
+				reuseWriter.reset();
+			}
 			for (int i = 0; i < value.length; i++) {
 				Object field = value[i];
 				if (field == null) {
-					writer.setNullAt(i, elementType);
+					reuseWriter.setNullAt(i, elementType);
 				} else {
-					BinaryWriter.write(writer, i, elementConverter.toInternalImpl(value[i]), elementType);
+					BinaryWriter.write(reuseWriter, i, elementConverter.toInternalImpl(value[i]), elementType, eleSer);
 				}
 			}
-			writer.complete();
-			return array;
+			reuseWriter.complete();
+			return reuseArray;
 		}
 
 		@Override
-		T[] toExternalImpl(BinaryArray value) {
-			return binaryArrayToJavaArray(value, elementType, componentClass, elementConverter);
+		T[] toExternalImpl(BaseArray value) {
+			return (isEleIndentity && value instanceof GenericArray) ?
+					genericArrayToJavaArray((GenericArray) value, elementType) :
+					binaryArrayToJavaArray((BinaryArray) value, elementType, componentClass, elementConverter);
 		}
 
 		@Override
@@ -928,6 +950,32 @@ public class DataFormatConverters {
 		}
 	}
 
+	private static <T> T[] genericArrayToJavaArray(GenericArray value, LogicalType eleType) {
+		Object array = value.getArray();
+		if (value.isPrimitiveArray()) {
+			switch (eleType.getTypeRoot()) {
+				case BOOLEAN:
+					return (T[]) ArrayUtils.toObject((boolean[]) array);
+				case TINYINT:
+					return (T[]) ArrayUtils.toObject((byte[]) array);
+				case SMALLINT:
+					return (T[]) ArrayUtils.toObject((short[]) array);
+				case INTEGER:
+					return (T[]) ArrayUtils.toObject((int[]) array);
+				case BIGINT:
+					return (T[]) ArrayUtils.toObject((long[]) array);
+				case FLOAT:
+					return (T[]) ArrayUtils.toObject((float[]) array);
+				case DOUBLE:
+					return (T[]) ArrayUtils.toObject((double[]) array);
+				default:
+					throw new RuntimeException("Not a primitive type: " + eleType);
+			}
+		} else {
+			return (T[]) array;
+		}
+	}
+
 	private static <T> T[] binaryArrayToJavaArray(BinaryArray value, LogicalType elementType,
 			Class<T> componentClass, DataFormatConverter<Object, T> elementConverter) {
 		int size = value.numElements();
@@ -946,7 +994,7 @@ public class DataFormatConverters {
 	/**
 	 * Converter for map.
 	 */
-	public static final class MapConverter extends DataFormatConverter<BinaryMap, Map> {
+	public static final class MapConverter extends DataFormatConverter<BaseMap, Map> {
 
 		private static final long serialVersionUID = -916429669828309919L;
 
@@ -962,6 +1010,16 @@ public class DataFormatConverters {
 		private final Class keyComponentClass;
 		private final Class valueComponentClass;
 
+		private final TypeSerializer keySer;
+		private final TypeSerializer valueSer;
+
+		private final boolean isKeyValueIndentity;
+
+		private transient BinaryArray reuseKArray;
+		private transient BinaryArrayWriter reuseKWriter;
+		private transient BinaryArray reuseVArray;
+		private transient BinaryArrayWriter reuseVWriter;
+
 		public MapConverter(DataType keyTypeInfo, DataType valueTypeInfo) {
 			this.keyType = LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(keyTypeInfo);
 			this.valueType = LogicalTypeDataTypeConverter.fromDataTypeToLogicalType(valueTypeInfo);
@@ -971,38 +1029,58 @@ public class DataFormatConverters {
 			this.valueElementSize = BinaryArray.calculateFixLengthPartSize(valueType);
 			this.keyComponentClass = keyTypeInfo.getConversionClass();
 			this.valueComponentClass = valueTypeInfo.getConversionClass();
+			this.isKeyValueIndentity = keyConverter instanceof IdentityConverter &&
+					valueConverter instanceof IdentityConverter;
+			this.keySer = InternalSerializers.create(this.keyType, new ExecutionConfig());
+			this.valueSer = InternalSerializers.create(this.valueType, new ExecutionConfig());
 		}
 
 		@Override
-		BinaryMap toInternalImpl(Map value) {
-			BinaryArray keyArray = new BinaryArray();
-			BinaryArrayWriter keyWriter = new BinaryArrayWriter(keyArray, value.size(), keyElementSize);
+		BaseMap toInternalImpl(Map value) {
+			return isKeyValueIndentity ? new GenericMap(value) : toBinaryMap(value);
+		}
 
-			BinaryArray valueArray = new BinaryArray();
-			BinaryArrayWriter valueWriter = new BinaryArrayWriter(valueArray, value.size(), valueElementSize);
+		private BinaryMap toBinaryMap(Map value) {
+			if (reuseKArray == null) {
+				reuseKArray = new BinaryArray();
+				reuseVArray = new BinaryArray();
+			}
+			if (reuseKWriter == null || reuseKWriter.getNumElements() != value.size()) {
+				reuseKWriter = new BinaryArrayWriter(reuseKArray, value.size(), keyElementSize);
+				reuseVWriter = new BinaryArrayWriter(reuseVArray, value.size(), valueElementSize);
+			} else {
+				reuseKWriter.reset();
+				reuseVWriter.reset();
+			}
 
 			int i = 0;
 			for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) value).entrySet()) {
 				if (entry.getKey() == null) {
-					keyWriter.setNullAt(i, keyType);
+					reuseKWriter.setNullAt(i, keyType);
 				} else {
-					BinaryWriter.write(keyWriter, i, keyConverter.toInternalImpl(entry.getKey()), keyType);
+					BinaryWriter.write(reuseKWriter, i, keyConverter.toInternalImpl(entry.getKey()), keyType, keySer);
 				}
 				if (entry.getValue() == null) {
-					valueWriter.setNullAt(i, valueType);
+					reuseVWriter.setNullAt(i, valueType);
 				} else {
-					BinaryWriter.write(valueWriter, i, valueConverter.toInternalImpl(entry.getValue()), valueType);
+					BinaryWriter.write(reuseVWriter, i, valueConverter.toInternalImpl(entry.getValue()), valueType, valueSer);
 				}
 				i++;
 			}
 
-			keyWriter.complete();
-			valueWriter.complete();
-			return BinaryMap.valueOf(keyArray, valueArray);
+			reuseKWriter.complete();
+			reuseVWriter.complete();
+			return BinaryMap.valueOf(reuseKArray, reuseVArray);
 		}
 
 		@Override
-		Map toExternalImpl(BinaryMap value) {
+		Map toExternalImpl(BaseMap value) {
+			return (isKeyValueIndentity && value instanceof GenericMap) ?
+					((GenericMap) value).getMap() :
+					binaryMapToMap((BinaryMap) value);
+		}
+
+		private Map binaryMapToMap(BinaryMap value) {
 			Map<Object, Object> map = new HashMap<>();
 			Object[] keys = binaryArrayToJavaArray(value.keyArray(), keyType, keyComponentClass, keyConverter);
 			Object[] values = binaryArrayToJavaArray(value.valueArray(), valueType, valueComponentClass, valueConverter);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericArray.java
new file mode 100644
index 0000000..94077be
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericArray.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A GenericArray is an array where all the elements have the same type.
+ * It can be considered as a wrapper class of the normal java array.
+ */
+public class GenericArray implements BaseArray {
+
+	private final Object arr;
+	private final int numElements;
+	private final boolean isPrimitiveArray;
+
+	public GenericArray(Object arr, int numElements) {
+		this(arr, numElements, isPrimitiveArray(arr));
+	}
+
+	public GenericArray(Object arr, int numElements, boolean isPrimitiveArray) {
+		this.arr = arr;
+		this.numElements = numElements;
+		this.isPrimitiveArray = isPrimitiveArray;
+	}
+
+	private static boolean isPrimitiveArray(Object arr) {
+		checkNotNull(arr);
+		checkArgument(arr.getClass().isArray());
+		return arr.getClass().getComponentType().isPrimitive();
+	}
+
+	public boolean isPrimitiveArray() {
+		return isPrimitiveArray;
+	}
+
+	public Object getArray() {
+		return arr;
+	}
+
+	@Override
+	public int numElements() {
+		return numElements;
+	}
+
+	@Override
+	public boolean isNullAt(int pos) {
+		return !isPrimitiveArray && ((Object[]) arr)[pos] == null;
+	}
+
+	@Override
+	public void setNullAt(int pos) {
+		checkState(!isPrimitiveArray, "Can't set null for primitive array");
+		((Object[]) arr)[pos] = null;
+	}
+
+	@Override
+	public void setNotNullAt(int pos) {
+		// do nothing, as an update will follow immediately
+	}
+
+	@Override
+	public void setNullLong(int pos) {
+		setNullAt(pos);
+	}
+
+	@Override
+	public void setNullInt(int pos) {
+		setNullAt(pos);
+	}
+
+	@Override
+	public void setNullBoolean(int pos) {
+		setNullAt(pos);
+	}
+
+	@Override
+	public void setNullByte(int pos) {
+		setNullAt(pos);
+	}
+
+	@Override
+	public void setNullShort(int pos) {
+		setNullAt(pos);
+	}
+
+	@Override
+	public void setNullFloat(int pos) {
+		setNullAt(pos);
+	}
+
+	@Override
+	public void setNullDouble(int pos) {
+		setNullAt(pos);
+	}
+
+	@Override
+	public boolean[] toBooleanArray() {
+		return (boolean[]) arr;
+	}
+
+	@Override
+	public byte[] toByteArray() {
+		return (byte[]) arr;
+	}
+
+	@Override
+	public short[] toShortArray() {
+		return (short[]) arr;
+	}
+
+	@Override
+	public int[] toIntArray() {
+		return (int[]) arr;
+	}
+
+	@Override
+	public long[] toLongArray() {
+		return (long[]) arr;
+	}
+
+	@Override
+	public float[] toFloatArray() {
+		return (float[]) arr;
+	}
+
+	@Override
+	public double[] toDoubleArray() {
+		return (double[]) arr;
+	}
+
+	@Override
+	public boolean getBoolean(int pos) {
+		return isPrimitiveArray ? ((boolean[]) arr)[pos] : (boolean) getObject(pos);
+	}
+
+	@Override
+	public byte getByte(int pos) {
+		return isPrimitiveArray ? ((byte[]) arr)[pos] : (byte) getObject(pos);
+	}
+
+	@Override
+	public short getShort(int pos) {
+		return isPrimitiveArray ? ((short[]) arr)[pos] : (short) getObject(pos);
+	}
+
+	@Override
+	public int getInt(int pos) {
+		return isPrimitiveArray ? ((int[]) arr)[pos] : (int) getObject(pos);
+	}
+
+	@Override
+	public long getLong(int pos) {
+		return isPrimitiveArray ? ((long[]) arr)[pos] : (long) getObject(pos);
+	}
+
+	@Override
+	public float getFloat(int pos) {
+		return isPrimitiveArray ? ((float[]) arr)[pos] : (float) getObject(pos);
+	}
+
+	@Override
+	public double getDouble(int pos) {
+		return isPrimitiveArray ? ((double[]) arr)[pos] : (double) getObject(pos);
+	}
+
+	@Override
+	public byte[] getBinary(int pos) {
+		return (byte[]) getObject(pos);
+	}
+
+	@Override
+	public BinaryString getString(int pos) {
+		return (BinaryString) getObject(pos);
+	}
+
+	@Override
+	public Decimal getDecimal(int pos, int precision, int scale) {
+		return (Decimal) getObject(pos);
+	}
+
+	@Override
+	public <T> BinaryGeneric<T> getGeneric(int pos) {
+		return (BinaryGeneric) getObject(pos);
+	}
+
+	@Override
+	public BaseRow getRow(int pos, int numFields) {
+		return (BaseRow) getObject(pos);
+	}
+
+	@Override
+	public BaseArray getArray(int pos) {
+		return (BaseArray) getObject(pos);
+	}
+
+	@Override
+	public BaseMap getMap(int pos) {
+		return (BaseMap) getObject(pos);
+	}
+
+	@Override
+	public void setBoolean(int pos, boolean value) {
+		if (isPrimitiveArray) {
+			((boolean[]) arr)[pos] = value;
+		} else {
+			setObject(pos, value);
+		}
+	}
+
+	@Override
+	public void setByte(int pos, byte value) {
+		if (isPrimitiveArray) {
+			((byte[]) arr)[pos] = value;
+		} else {
+			setObject(pos, value);
+		}
+	}
+
+	@Override
+	public void setShort(int pos, short value) {
+		if (isPrimitiveArray) {
+			((short[]) arr)[pos] = value;
+		} else {
+			setObject(pos, value);
+		}
+	}
+
+	@Override
+	public void setInt(int pos, int value) {
+		if (isPrimitiveArray) {
+			((int[]) arr)[pos] = value;
+		} else {
+			setObject(pos, value);
+		}
+	}
+
+	@Override
+	public void setLong(int pos, long value) {
+		if (isPrimitiveArray) {
+			((long[]) arr)[pos] = value;
+		} else {
+			setObject(pos, value);
+		}
+	}
+
+	@Override
+	public void setFloat(int pos, float value) {
+		if (isPrimitiveArray) {
+			((float[]) arr)[pos] = value;
+		} else {
+			setObject(pos, value);
+		}
+	}
+
+	@Override
+	public void setDouble(int pos, double value) {
+		if (isPrimitiveArray) {
+			((double[]) arr)[pos] = value;
+		} else {
+			setObject(pos, value);
+		}
+	}
+
+	@Override
+	public void setDecimal(int pos, Decimal value, int precision) {
+		setObject(pos, value);
+	}
+
+	public Object getObject(int pos) {
+		return ((Object[]) arr)[pos];
+	}
+
+	public void setObject(int pos, Object value) {
+		((Object[]) arr)[pos] = value;
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericMap.java
similarity index 53%
copy from flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java
copy to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericMap.java
index 8fdb7b0..fb0217b 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericMap.java
@@ -16,38 +16,44 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.typeutils;
+package org.apache.flink.table.dataformat;
 
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Map;
 
 /**
- * A test for the {@link BinaryArraySerializer}.
+ * A GenericMap is a map where all the keys have the same type, and all the values have the same type.
+ * It can be considered as a wrapper class of the normal java map.
  */
-public class DecimalSerializerTest extends SerializerTestBase<Decimal> {
+public class GenericMap implements BaseMap {
 
-	@Override
-	protected DecimalSerializer createSerializer() {
-		return new DecimalSerializer(5, 2);
+	private final Map<Object, Object> map;
+
+	public GenericMap(Map<Object, Object> map) {
+		this.map = map;
 	}
 
 	@Override
-	protected int getLength() {
-		return -1;
+	public int numElements() {
+		return map.size();
 	}
 
 	@Override
-	protected Class<Decimal> getTypeClass() {
-		return Decimal.class;
+	public Map toJavaMap(LogicalType keyType, LogicalType valueType) {
+		return map;
+	}
+
+	public Object get(Object key) {
+		return map.get(key);
 	}
 
 	@Override
-	protected Decimal[] getTestData() {
-		return new Decimal[] {
-				Decimal.fromLong(1, 5, 2),
-				Decimal.fromLong(2, 5, 2),
-				Decimal.fromLong(3, 5, 2),
-				Decimal.fromLong(4, 5, 2)
-		};
+	public String toString() {
+		return map.toString();
+	}
+
+	public Map<Object, Object> getMap() {
+		return map;
 	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
index a92ca48..92e29a5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
@@ -172,7 +172,7 @@ public final class JoinedRow implements BaseRow {
 	}
 
 	@Override
-	public BinaryArray getArray(int i) {
+	public BaseArray getArray(int i) {
 		if (i < row1.getArity()) {
 			return row1.getArray(i);
 		} else {
@@ -181,7 +181,7 @@ public final class JoinedRow implements BaseRow {
 	}
 
 	@Override
-	public BinaryMap getMap(int i) {
+	public BaseMap getMap(int i) {
 		if (i < row1.getArity()) {
 			return row1.getMap(i);
 		} else {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
index c211e52..1ba5592 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
@@ -258,13 +258,13 @@ public final class NestedRow extends BinaryFormat implements BaseRow {
 	}
 
 	@Override
-	public BinaryArray getArray(int pos) {
+	public BaseArray getArray(int pos) {
 		assertIndexIsValid(pos);
 		return BinaryArray.readBinaryArrayFieldFromSegments(segments, offset, getLong(pos));
 	}
 
 	@Override
-	public BinaryMap getMap(int pos) {
+	public BaseMap getMap(int pos) {
 		assertIndexIsValid(pos);
 		return BinaryMap.readBinaryMapFieldFromSegments(segments, offset, getLong(pos));
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
index a2be7b2..b6f4799 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
@@ -76,13 +76,13 @@ public abstract class ObjectArrayRow implements BaseRow {
 	}
 
 	@Override
-	public BinaryArray getArray(int ordinal) {
-		return (BinaryArray) this.fields[ordinal];
+	public BaseArray getArray(int ordinal) {
+		return (BaseArray) this.fields[ordinal];
 	}
 
 	@Override
-	public BinaryMap getMap(int ordinal) {
-		return (BinaryMap) this.fields[ordinal];
+	public BaseMap getMap(int ordinal) {
+		return (BaseMap) this.fields[ordinal];
 	}
 
 	@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
index 92217ed..29ed39c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
@@ -101,14 +101,14 @@ public interface TypeGetterSetters {
 	byte[] getBinary(int ordinal);
 
 	/**
-	 * Get array value, internal format is BinaryArray.
+	 * Get array value, internal format is BaseArray.
 	 */
-	BinaryArray getArray(int ordinal);
+	BaseArray getArray(int ordinal);
 
 	/**
-	 * Get map value, internal format is BinaryMap.
+	 * Get map value, internal format is BaseMap.
 	 */
-	BinaryMap getMap(int ordinal);
+	BaseMap getMap(int ordinal);
 
 	/**
 	 * Get row value, internal format is BaseRow.
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java
index d385181..e67aa38 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/UpdatableRow.java
@@ -117,12 +117,12 @@ public final class UpdatableRow implements BaseRow {
 	}
 
 	@Override
-	public BinaryArray getArray(int ordinal) {
-		return updated[ordinal] ? (BinaryArray) fields[ordinal] : row.getArray(ordinal);
+	public BaseArray getArray(int ordinal) {
+		return updated[ordinal] ? (BaseArray) fields[ordinal] : row.getArray(ordinal);
 	}
 
 	@Override
-	public BinaryMap getMap(int ordinal) {
+	public BaseMap getMap(int ordinal) {
 		return updated[ordinal] ? (BinaryMap) fields[ordinal] : row.getMap(ordinal);
 	}
 
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
index 9797091..eb96bdf 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java
@@ -243,7 +243,7 @@ public class BinaryHashTable extends BaseHybridHashTable {
 	public void putBuildRow(BaseRow row) throws IOException {
 		final int hashCode = hash(this.buildSideProjection.apply(row).hashCode(), 0);
 		// TODO: combine key projection and build side conversion to code gen.
-		insertIntoTable(originBuildSideSerializer.baseRowToBinary(row), hashCode);
+		insertIntoTable(originBuildSideSerializer.toBinaryRow(row), hashCode);
 	}
 
 	/**
@@ -290,7 +290,7 @@ public class BinaryHashTable extends BaseHybridHashTable {
 			return true;
 		} else {
 			if (p.testHashBloomFilter(hash)) {
-				BinaryRow row = originProbeSideSerializer.baseRowToBinary(record);
+				BinaryRow row = originProbeSideSerializer.toBinaryRow(record);
 				p.insertIntoProbeBuffer(row);
 			}
 			return false;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/StreamSortOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/StreamSortOperator.java
index c091985..3ce0d0f 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/StreamSortOperator.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/StreamSortOperator.java
@@ -96,7 +96,7 @@ public class StreamSortOperator extends TableStreamOperator<BaseRow> implements
 	@Override
 	public void processElement(StreamRecord<BaseRow> element) throws Exception {
 		BaseRow originalInput = element.getValue();
-		BinaryRow input = baseRowSerializer.baseRowToBinary(originalInput).copy();
+		BinaryRow input = baseRowSerializer.toBinaryRow(originalInput).copy();
 		BaseRowUtil.setAccumulate(input);
 		long count = inputBuffer.getOrDefault(input, 0L);
 		if (BaseRowUtil.isAccumulateMsg(originalInput)) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
index 9050435..a34d434 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java
@@ -19,10 +19,10 @@
 package org.apache.flink.table.types;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.dataformat.BaseArray;
+import org.apache.flink.table.dataformat.BaseMap;
 import org.apache.flink.table.dataformat.BaseRow;
-import org.apache.flink.table.dataformat.BinaryArray;
 import org.apache.flink.table.dataformat.BinaryGeneric;
-import org.apache.flink.table.dataformat.BinaryMap;
 import org.apache.flink.table.dataformat.BinaryString;
 import org.apache.flink.table.dataformat.Decimal;
 import org.apache.flink.table.types.logical.ArrayType;
@@ -160,10 +160,10 @@ public class ClassLogicalTypeConverter {
 			case DECIMAL:
 				return Decimal.class;
 			case ARRAY:
-				return BinaryArray.class;
+				return BaseArray.class;
 			case MAP:
 			case MULTISET:
-				return BinaryMap.class;
+				return BaseMap.class;
 			case ROW:
 				return BaseRow.class;
 			case BINARY:
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
index c871038..9f2ef5b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java
@@ -28,14 +28,18 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.ShortSerializer;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
 import org.apache.flink.table.typeutils.BaseRowSerializer;
-import org.apache.flink.table.typeutils.BinaryArraySerializer;
 import org.apache.flink.table.typeutils.BinaryGenericSerializer;
-import org.apache.flink.table.typeutils.BinaryMapSerializer;
 import org.apache.flink.table.typeutils.BinaryStringSerializer;
 import org.apache.flink.table.typeutils.DecimalSerializer;
 
@@ -72,10 +76,12 @@ public class InternalSerializers {
 				DecimalType decimalType = (DecimalType) type;
 				return new DecimalSerializer(decimalType.getPrecision(), decimalType.getScale());
 			case ARRAY:
-				return BinaryArraySerializer.INSTANCE;
+				return new BaseArraySerializer(((ArrayType) type).getElementType(), config);
 			case MAP:
+				MapType mapType = (MapType) type;
+				return new BaseMapSerializer(mapType.getKeyType(), mapType.getValueType());
 			case MULTISET:
-				return BinaryMapSerializer.INSTANCE;
+				return new BaseMapSerializer(((MultisetType) type).getElementType(), new IntType());
 			case ROW:
 				RowType rowType = (RowType) type;
 				return new BaseRowSerializer(config, rowType);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
index d233083..e0c9ebb 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.typeutils.MapViewTypeInfo;
 import org.apache.flink.types.Row;
 
 import static org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
+import static org.apache.flink.table.types.PlannerTypeUtils.isPrimitive;
 
 /**
  * Converter between {@link TypeInformation} and {@link DataType}.
@@ -79,7 +80,8 @@ public class TypeInfoDataTypeConverter {
 			case VARBINARY: // ignore precision
 				return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
 			case ARRAY:
-				if (dataType instanceof CollectionDataType) {
+				if (dataType instanceof CollectionDataType &&
+						!isPrimitive(((CollectionDataType) dataType).getElementDataType().getLogicalType())) {
 					return ObjectArrayTypeInfo.getInfoFor(
 							fromDataTypeToTypeInfo(((CollectionDataType) dataType).getElementDataType()));
 				} else {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/AbstractRowSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/AbstractRowSerializer.java
index 4d99b95..5043eb4 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/AbstractRowSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/AbstractRowSerializer.java
@@ -39,7 +39,7 @@ public abstract class AbstractRowSerializer<T extends BaseRow> extends TypeSeria
 	/**
 	 * Convert a {@link BaseRow} to a {@link BinaryRow}.
 	 */
-	public abstract BinaryRow baseRowToBinary(T baseRow) throws IOException;
+	public abstract BinaryRow toBinaryRow(T baseRow) throws IOException;
 
 	/**
 	 * Serializes the given record to the given target paged output view. Some implementations may
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
new file mode 100644
index 0000000..d8bf95e
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseArraySerializer.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.	See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.	You may obtain a copy of the License at
+ *
+ *		http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BaseArray;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.GenericArray;
+import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+
+import static org.apache.flink.table.types.ClassLogicalTypeConverter.getInternalClassForType;
+
+/**
+ * Serializer for {@link BaseArray}.
+ */
+public class BaseArraySerializer extends TypeSerializer<BaseArray> {
+
+	private final LogicalType eleType;
+	private final TypeSerializer eleSer;
+
+	private transient BinaryArray reuseArray;
+	private transient BinaryArrayWriter reuseWriter;
+
+	public BaseArraySerializer(LogicalType eleType, ExecutionConfig conf) {
+		this.eleType = eleType;
+		this.eleSer = InternalSerializers.create(eleType, conf);
+	}
+
+	private BaseArraySerializer(LogicalType eleType, TypeSerializer eleSer) {
+		this.eleType = eleType;
+		this.eleSer = eleSer;
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<BaseArray> duplicate() {
+		return new BaseArraySerializer(eleType, eleSer.duplicate());
+	}
+
+	@Override
+	public BaseArray createInstance() {
+		return new BinaryArray();
+	}
+
+	@Override
+	public BaseArray copy(BaseArray from) {
+		return from instanceof GenericArray ?
+				copyGenericArray((GenericArray) from) :
+				((BinaryArray) from).copy();
+	}
+
+	@Override
+	public BaseArray copy(BaseArray from, BaseArray reuse) {
+		return copy(from);
+	}
+
+	private GenericArray copyGenericArray(GenericArray array) {
+		Object arr;
+		if (array.isPrimitiveArray()) {
+			switch (eleType.getTypeRoot()) {
+				case BOOLEAN:
+					arr = Arrays.copyOf((boolean[]) array.getArray(), array.numElements());
+					break;
+				case TINYINT:
+					arr = Arrays.copyOf((byte[]) array.getArray(), array.numElements());
+					break;
+				case SMALLINT:
+					arr = Arrays.copyOf((short[]) array.getArray(), array.numElements());
+					break;
+				case INTEGER:
+					arr = Arrays.copyOf((int[]) array.getArray(), array.numElements());
+					break;
+				case BIGINT:
+					arr = Arrays.copyOf((long[]) array.getArray(), array.numElements());
+					break;
+				case FLOAT:
+					arr = Arrays.copyOf((float[]) array.getArray(), array.numElements());
+					break;
+				case DOUBLE:
+					arr = Arrays.copyOf((double[]) array.getArray(), array.numElements());
+					break;
+				default:
+					throw new RuntimeException("Unknown type: " + eleType);
+			}
+		} else {
+			Object[] objectArray = (Object[]) array.getArray();
+			Object[] newArray = (Object[]) Array.newInstance(
+					getInternalClassForType(eleType),
+					array.numElements());
+			for (int i = 0; i < array.numElements(); i++) {
+				newArray[i] = eleSer.copy(objectArray[i]);
+			}
+			arr = newArray;
+		}
+		return new GenericArray(arr, array.numElements(), array.isPrimitiveArray());
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(BaseArray record, DataOutputView target) throws IOException {
+		BinaryArray binaryArray = toBinaryArray(record);
+		target.writeInt(binaryArray.getSizeInBytes());
+		SegmentsUtil.copyToView(binaryArray.getSegments(), binaryArray.getOffset(), binaryArray.getSizeInBytes(), target);
+	}
+
+	public BinaryArray toBinaryArray(BaseArray from) {
+		if (from instanceof BinaryArray) {
+			return (BinaryArray) from;
+		}
+
+		int numElements = from.numElements();
+		if (reuseArray == null) {
+			reuseArray = new BinaryArray();
+		}
+		if (reuseWriter == null || reuseWriter.getNumElements() != numElements) {
+			reuseWriter = new BinaryArrayWriter(
+					reuseArray, numElements, BinaryArray.calculateFixLengthPartSize(eleType));
+		} else {
+			reuseWriter.reset();
+		}
+
+		for (int i = 0; i < numElements; i++) {
+			if (from.isNullAt(i)) {
+				reuseWriter.setNullAt(i, eleType);
+			} else {
+				BinaryWriter.write(reuseWriter, i, TypeGetterSetters.get(from, i, eleType), eleType, eleSer);
+			}
+		}
+		reuseWriter.complete();
+
+		return reuseArray;
+	}
+
+	@Override
+	public BaseArray deserialize(DataInputView source) throws IOException {
+		return deserializeReuse(new BinaryArray(), source);
+	}
+
+	@Override
+	public BaseArray deserialize(BaseArray reuse, DataInputView source) throws IOException {
+		return deserializeReuse(reuse instanceof GenericArray ? new BinaryArray() : (BinaryArray) reuse, source);
+	}
+
+	private BinaryArray deserializeReuse(BinaryArray reuse, DataInputView source) throws IOException {
+		int length = source.readInt();
+		byte[] bytes = new byte[length];
+		source.readFully(bytes);
+		reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		int length = source.readInt();
+		target.writeInt(length);
+		target.write(source, length);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		BaseArraySerializer that = (BaseArraySerializer) o;
+
+		return eleType.equals(that.eleType);
+	}
+
+	@Override
+	public int hashCode() {
+		return eleType.hashCode();
+	}
+
+	@Override
+	public TypeSerializerSnapshot<BaseArray> snapshotConfiguration() {
+		return new BaseArraySerializerSnapshot(eleType);
+	}
+
+	/**
+	 * {@link TypeSerializerSnapshot} for {@link BaseArraySerializer}.
+	 */
+	public static final class BaseArraySerializerSnapshot implements TypeSerializerSnapshot<BaseArray> {
+		private static final int CURRENT_VERSION = 3;
+
+		private LogicalType previousType;
+
+		@SuppressWarnings("unused")
+		public BaseArraySerializerSnapshot() {
+			// this constructor is used when restoring from a checkpoint/savepoint.
+		}
+
+		BaseArraySerializerSnapshot(LogicalType eleType) {
+			this.previousType = eleType;
+		}
+
+		@Override
+		public int getCurrentVersion() {
+			return CURRENT_VERSION;
+		}
+
+		@Override
+		public void writeSnapshot(DataOutputView out) throws IOException {
+			InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousType);
+		}
+
+		@Override
+		public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+			try {
+				this.previousType = InstantiationUtil.deserializeObject(
+						new DataInputViewStream(in), userCodeClassLoader);
+			} catch (ClassNotFoundException e) {
+				throw new IOException(e);
+			}
+		}
+
+		@Override
+		public TypeSerializer<BaseArray> restoreSerializer() {
+			return new BaseArraySerializer(previousType, new ExecutionConfig());
+		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<BaseArray> resolveSchemaCompatibility(TypeSerializer<BaseArray> newSerializer) {
+			if (!(newSerializer instanceof BaseArraySerializer)) {
+				return TypeSerializerSchemaCompatibility.incompatible();
+			}
+
+			BaseArraySerializer newBaseArraySerializer = (BaseArraySerializer) newSerializer;
+			if (!previousType.equals(newBaseArraySerializer.eleType)) {
+				return TypeSerializerSchemaCompatibility.incompatible();
+			} else {
+				return TypeSerializerSchemaCompatibility.compatibleAsIs();
+			}
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
new file mode 100644
index 0000000..01dba56
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseMapSerializer.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.	See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.	You may obtain a copy of the License at
+ *
+ *		http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BaseMap;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryWriter;
+import org.apache.flink.table.dataformat.GenericMap;
+import org.apache.flink.table.types.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Serializer for {@link BaseMap}.
+ */
+public class BaseMapSerializer extends TypeSerializer<BaseMap> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BaseMapSerializer.class);
+
+	private final LogicalType keyType;
+	private final LogicalType valueType;
+
+	private final TypeSerializer keySerializer;
+	private final TypeSerializer valueSerializer;
+
+	private transient BinaryArray reuseKeyArray;
+	private transient BinaryArray reuseValueArray;
+	private transient BinaryArrayWriter reuseKeyWriter;
+	private transient BinaryArrayWriter reuseValueWriter;
+
+	public BaseMapSerializer(LogicalType keyType, LogicalType valueType) {
+		this.keyType = keyType;
+		this.valueType = valueType;
+
+		this.keySerializer = InternalSerializers.create(keyType, new ExecutionConfig());
+		this.valueSerializer = InternalSerializers.create(valueType, new ExecutionConfig());
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<BaseMap> duplicate() {
+		return new BaseMapSerializer(keyType, valueType);
+	}
+
+	@Override
+	public BaseMap createInstance() {
+		return new BinaryMap();
+	}
+
+	/**
+	 * NOTE: Map should be a HashMap, when we insert the key/value pairs of the TreeMap into
+	 * a HashMap, problems maybe occur.
+	 */
+	@Override
+	public BaseMap copy(BaseMap from) {
+		if (from instanceof GenericMap) {
+			Map<Object, Object> fromMap = ((GenericMap) from).getMap();
+			if (!(fromMap instanceof HashMap) && LOG.isDebugEnabled()) {
+				LOG.debug("It is dangerous to copy a non-HashMap to a HashMap.");
+			}
+			HashMap<Object, Object> toMap = new HashMap<>();
+			for (Map.Entry<Object, Object> entry : fromMap.entrySet()) {
+				toMap.put(
+						keySerializer.copy(entry.getKey()),
+						valueSerializer.copy(entry.getValue()));
+			}
+			return new GenericMap(toMap);
+		} else {
+			return ((BinaryMap) from).copy();
+		}
+	}
+
+	@Override
+	public BaseMap copy(BaseMap from, BaseMap reuse) {
+		return copy(from);
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(BaseMap record, DataOutputView target) throws IOException {
+		BinaryMap binaryMap = toBinaryMap(record);
+		target.writeInt(binaryMap.getSizeInBytes());
+		SegmentsUtil.copyToView(binaryMap.getSegments(), binaryMap.getOffset(), binaryMap.getSizeInBytes(), target);
+	}
+
+	public BinaryMap toBinaryMap(BaseMap from) {
+		if (from instanceof BinaryMap) {
+			return (BinaryMap) from;
+		}
+
+		Map<Object, Object> javaMap = ((GenericMap) from).getMap();
+		int numElements = javaMap.size();
+		if (reuseKeyArray == null) {
+			reuseKeyArray = new BinaryArray();
+		}
+		if (reuseValueArray == null) {
+			reuseValueArray = new BinaryArray();
+		}
+		if (reuseKeyWriter == null || reuseKeyWriter.getNumElements() != numElements) {
+			reuseKeyWriter = new BinaryArrayWriter(
+					reuseKeyArray, numElements, BinaryArray.calculateFixLengthPartSize(keyType));
+		} else {
+			reuseKeyWriter.reset();
+		}
+		if (reuseValueWriter == null || reuseValueWriter.getNumElements() != numElements) {
+			reuseValueWriter = new BinaryArrayWriter(
+					reuseValueArray, numElements, BinaryArray.calculateFixLengthPartSize(valueType));
+		} else {
+			reuseValueWriter.reset();
+		}
+
+		int i = 0;
+		for (Map.Entry<Object, Object> entry : javaMap.entrySet()) {
+			if (entry.getKey() == null) {
+				reuseKeyWriter.setNullAt(i, keyType);
+			} else {
+				BinaryWriter.write(reuseKeyWriter, i, entry.getKey(), keyType, keySerializer);
+			}
+			if (entry.getValue() == null) {
+				reuseValueWriter.setNullAt(i, valueType);
+			} else {
+				BinaryWriter.write(reuseValueWriter, i, entry.getValue(), valueType, valueSerializer);
+			}
+			i++;
+		}
+		reuseKeyWriter.complete();
+		reuseValueWriter.complete();
+
+		return BinaryMap.valueOf(reuseKeyArray, reuseValueArray);
+	}
+
+	@Override
+	public BaseMap deserialize(DataInputView source) throws IOException {
+		return deserializeReuse(new BinaryMap(), source);
+	}
+
+	@Override
+	public BaseMap deserialize(BaseMap reuse, DataInputView source) throws IOException {
+		return deserializeReuse(reuse instanceof GenericMap ? new BinaryMap() : (BinaryMap) reuse, source);
+	}
+
+	private BinaryMap deserializeReuse(BinaryMap reuse, DataInputView source) throws IOException {
+		int length = source.readInt();
+		byte[] bytes = new byte[length];
+		source.readFully(bytes);
+		reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
+		return reuse;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		int length = source.readInt();
+		target.writeInt(length);
+		target.write(source, length);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		BaseMapSerializer that = (BaseMapSerializer) o;
+
+		return keyType.equals(that.keyType) && valueType.equals(that.valueType);
+	}
+
+	@Override
+	public int hashCode() {
+		int result = keyType.hashCode();
+		result = 31 * result + valueType.hashCode();
+		return result;
+	}
+
+	@Override
+	public TypeSerializerSnapshot<BaseMap> snapshotConfiguration() {
+		return new BaseMapSerializerSnapshot(keyType, valueType);
+	}
+
+	/**
+	 * {@link TypeSerializerSnapshot} for {@link BaseArraySerializer}.
+	 */
+	public static final class BaseMapSerializerSnapshot implements TypeSerializerSnapshot<BaseMap> {
+		private static final int CURRENT_VERSION = 3;
+
+		private LogicalType previousKeyType;
+		private LogicalType previousValueType;
+
+		@SuppressWarnings("unused")
+		public BaseMapSerializerSnapshot() {
+			// this constructor is used when restoring from a checkpoint/savepoint.
+		}
+
+		BaseMapSerializerSnapshot(LogicalType keyT, LogicalType valueT) {
+			this.previousKeyType = keyT;
+			this.previousValueType = valueT;
+		}
+
+		@Override
+		public int getCurrentVersion() {
+			return CURRENT_VERSION;
+		}
+
+		@Override
+		public void writeSnapshot(DataOutputView out) throws IOException {
+			InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousKeyType);
+			InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousValueType);
+		}
+
+		@Override
+		public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+			try {
+				this.previousKeyType = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader);
+				this.previousValueType = InstantiationUtil.deserializeObject(new DataInputViewStream(in), userCodeClassLoader);
+			} catch (ClassNotFoundException e) {
+				throw new IOException(e);
+			}
+		}
+
+		@Override
+		public TypeSerializer<BaseMap> restoreSerializer() {
+			return new BaseMapSerializer(previousKeyType, previousValueType);
+		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<BaseMap> resolveSchemaCompatibility(TypeSerializer<BaseMap> newSerializer) {
+			if (!(newSerializer instanceof BaseMapSerializer)) {
+				return TypeSerializerSchemaCompatibility.incompatible();
+			}
+
+			BaseMapSerializer newBaseMapSerializer = (BaseMapSerializer) newSerializer;
+			if (!previousKeyType.equals(newBaseMapSerializer.keyType) ||
+					!previousValueType.equals(newBaseMapSerializer.valueType)) {
+				return TypeSerializerSchemaCompatibility.incompatible();
+			} else {
+				return TypeSerializerSchemaCompatibility.compatibleAsIs();
+			}
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
index 799f803..4bf0546 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowSerializer.java
@@ -53,6 +53,9 @@ public class BaseRowSerializer extends AbstractRowSerializer<BaseRow> {
 	private final LogicalType[] types;
 	private final TypeSerializer[] fieldSerializers;
 
+	private transient BinaryRow reuseRow;
+	private transient BinaryRowWriter reuseWriter;
+
 	public BaseRowSerializer(ExecutionConfig config, RowType rowType) {
 		this(rowType.getChildren().toArray(new LogicalType[0]),
 			rowType.getChildren().stream()
@@ -85,7 +88,7 @@ public class BaseRowSerializer extends AbstractRowSerializer<BaseRow> {
 
 	@Override
 	public void serialize(BaseRow row, DataOutputView target) throws IOException {
-		binarySerializer.serialize(baseRowToBinary(row), target);
+		binarySerializer.serialize(toBinaryRow(row), target);
 	}
 
 	@Override
@@ -165,49 +168,33 @@ public class BaseRowSerializer extends AbstractRowSerializer<BaseRow> {
 
 	/**
 	 * Convert base row to binary row.
-	 * TODO modify it to code gen, and reuse BinaryRow&BinaryRowWriter.
+	 * TODO modify it to code gen.
 	 */
 	@Override
-	public BinaryRow baseRowToBinary(BaseRow row) {
+	public BinaryRow toBinaryRow(BaseRow row) {
 		if (row instanceof BinaryRow) {
 			return (BinaryRow) row;
 		}
-		BinaryRow binaryRow = new BinaryRow(types.length);
-		BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
-		writer.writeHeader(row.getHeader());
-		for (int i = 0; i < types.length; i++) {
-			if (row.isNullAt(i)) {
-				writer.setNullAt(i);
-			} else {
-				BinaryWriter.write(writer, i, TypeGetterSetters.get(row, i, types[i]), types[i]);
-			}
+		if (reuseRow == null) {
+			reuseRow = new BinaryRow(types.length);
+			reuseWriter = new BinaryRowWriter(reuseRow);
 		}
-		writer.complete();
-		return binaryRow;
-	}
-
-	public static BinaryRow baseRowToBinary(BaseRow row, RowType type) {
-		if (row instanceof BinaryRow) {
-			return (BinaryRow) row;
-		}
-		BinaryRow binaryRow = new BinaryRow(type.getFields().size());
-		BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
-		writer.writeHeader(row.getHeader());
-		for (int i = 0; i < binaryRow.getArity(); i++) {
+		reuseWriter.reset();
+		reuseWriter.writeHeader(row.getHeader());
+		for (int i = 0; i < types.length; i++) {
 			if (row.isNullAt(i)) {
-				writer.setNullAt(i);
+				reuseWriter.setNullAt(i);
 			} else {
-				LogicalType fieldType = type.getFields().get(i).getType();
-				BinaryWriter.write(writer, i, TypeGetterSetters.get(row, i, fieldType), fieldType);
+				BinaryWriter.write(reuseWriter, i, TypeGetterSetters.get(row, i, types[i]), types[i], fieldSerializers[i]);
 			}
 		}
-		writer.complete();
-		return binaryRow;
+		reuseWriter.complete();
+		return reuseRow;
 	}
 
 	@Override
 	public int serializeToPages(BaseRow row, AbstractPagedOutputView target) throws IOException {
-		return binarySerializer.serializeToPages(baseRowToBinary(row), target);
+		return binarySerializer.serializeToPages(toBinaryRow(row), target);
 	}
 
 	@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArraySerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArraySerializer.java
deleted file mode 100644
index 7d988f3..0000000
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryArraySerializer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.	See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.	You may obtain a copy of the License at
- *
- *		http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils;
-
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.table.dataformat.BinaryArray;
-import org.apache.flink.table.util.SegmentsUtil;
-
-import java.io.IOException;
-
-/**
- * Serializer for {@link BinaryArray}.
- */
-public class BinaryArraySerializer extends TypeSerializerSingleton<BinaryArray> {
-
-	public static final BinaryArraySerializer INSTANCE = new BinaryArraySerializer();
-
-	private BinaryArraySerializer() {}
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public BinaryArray createInstance() {
-		return new BinaryArray();
-	}
-
-	@Override
-	public BinaryArray copy(BinaryArray from) {
-		return from.copy();
-	}
-
-	@Override
-	public BinaryArray copy(BinaryArray from, BinaryArray reuse) {
-		return from.copy(reuse);
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(BinaryArray record, DataOutputView target) throws IOException {
-		target.writeInt(record.getSizeInBytes());
-		SegmentsUtil.copyToView(record.getSegments(), record.getOffset(), record.getSizeInBytes(), target);
-	}
-
-	@Override
-	public BinaryArray deserialize(DataInputView source) throws IOException {
-		return deserialize(new BinaryArray(), source);
-	}
-
-	@Override
-	public BinaryArray deserialize(BinaryArray reuse, DataInputView source) throws IOException {
-		int length = source.readInt();
-		byte[] bytes = new byte[length];
-		source.readFully(bytes);
-		reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		int length = source.readInt();
-		target.writeInt(length);
-		target.write(source, length);
-	}
-
-	@Override
-	public TypeSerializerSnapshot<BinaryArray> snapshotConfiguration() {
-		return new BinaryArraySerializerSnapshot();
-	}
-
-	/**
-	 * Serializer configuration snapshot for compatibility and format evolution.
-	 */
-	@SuppressWarnings("WeakerAccess")
-	public static final class BinaryArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<BinaryArray> {
-
-		public BinaryArraySerializerSnapshot() {
-			super(() -> INSTANCE);
-		}
-	}
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapSerializer.java
deleted file mode 100644
index e46e066..0000000
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryMapSerializer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.	See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.	You may obtain a copy of the License at
- *
- *		http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils;
-
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.table.dataformat.BinaryMap;
-import org.apache.flink.table.util.SegmentsUtil;
-
-import java.io.IOException;
-
-/**
- * Serializer for {@link BinaryMap}.
- */
-public class BinaryMapSerializer extends TypeSerializerSingleton<BinaryMap> {
-
-	public static final BinaryMapSerializer INSTANCE = new BinaryMapSerializer();
-
-	private BinaryMapSerializer() {}
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public BinaryMap createInstance() {
-		return new BinaryMap();
-	}
-
-	@Override
-	public BinaryMap copy(BinaryMap from) {
-		return from.copy();
-	}
-
-	@Override
-	public BinaryMap copy(BinaryMap from, BinaryMap reuse) {
-		return from.copy(reuse);
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(BinaryMap record, DataOutputView target) throws IOException {
-		target.writeInt(record.getSizeInBytes());
-		SegmentsUtil.copyToView(record.getSegments(), record.getOffset(), record.getSizeInBytes(), target);
-	}
-
-	@Override
-	public BinaryMap deserialize(DataInputView source) throws IOException {
-		return deserialize(new BinaryMap(), source);
-	}
-
-	@Override
-	public BinaryMap deserialize(BinaryMap reuse, DataInputView source) throws IOException {
-		int length = source.readInt();
-		byte[] bytes = new byte[length];
-		source.readFully(bytes);
-		reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		int length = source.readInt();
-		target.writeInt(length);
-		target.write(source, length);
-	}
-
-	@Override
-	public TypeSerializerSnapshot<BinaryMap> snapshotConfiguration() {
-		return new BinaryMapSerializerSnapshot();
-	}
-
-	/**
-	 * Serializer configuration snapshot for compatibility and format evolution.
-	 */
-	@SuppressWarnings("WeakerAccess")
-	public static final class BinaryMapSerializerSnapshot extends SimpleTypeSerializerSnapshot<BinaryMap> {
-
-		public BinaryMapSerializerSnapshot() {
-			super(() -> INSTANCE);
-		}
-	}
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
index 9804767..c187471 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryRowSerializer.java
@@ -124,7 +124,7 @@ public class BinaryRowSerializer extends AbstractRowSerializer<BinaryRow> {
 	}
 
 	@Override
-	public BinaryRow baseRowToBinary(BinaryRow baseRow) throws IOException {
+	public BinaryRow toBinaryRow(BinaryRow baseRow) throws IOException {
 		return baseRow;
 	}
 
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
index dd6350a..43bc6a6 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
@@ -18,8 +18,12 @@
 package org.apache.flink.table.dataformat;
 
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -97,9 +101,9 @@ public class BaseRowTest {
 		writer.writeGeneric(9, generic);
 		writer.writeDecimal(10, decimal1, 5);
 		writer.writeDecimal(11, decimal2, 20);
-		writer.writeArray(12, array);
-		writer.writeMap(13, map);
-		writer.writeRow(14, underRow, RowType.of(new IntType(), new IntType()));
+		writer.writeArray(12, array, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
+		writer.writeMap(13, map, new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
+		writer.writeRow(14, underRow, new BaseRowSerializer(null, RowType.of(new IntType(), new IntType())));
 		writer.writeBinary(15, bytes);
 		return row;
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
index d7e8147..54d0802 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
@@ -21,9 +21,13 @@ package org.apache.flink.table.dataformat;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
 import org.apache.flink.table.util.SegmentsUtil;
 
 import org.junit.Assert;
@@ -59,10 +63,10 @@ public class BinaryArrayTest {
 		{
 			BinaryRow row2 = new BinaryRow(1);
 			BinaryRowWriter writer2 = new BinaryRowWriter(row2);
-			writer2.writeArray(0, array);
+			writer2.writeArray(0, array, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
 			writer2.complete();
 
-			BinaryArray array2 = row2.getArray(0);
+			BinaryArray array2 = (BinaryArray) row2.getArray(0);
 			assertEquals(array2, array);
 			assertEquals(array2.getInt(0), 6);
 			assertTrue(array2.isNullAt(1));
@@ -75,10 +79,10 @@ public class BinaryArrayTest {
 
 			BinaryRow row2 = new BinaryRow(1);
 			BinaryRowWriter writer2 = new BinaryRowWriter(row2);
-			writer2.writeArray(0, array3);
+			writer2.writeArray(0, array3, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
 			writer2.complete();
 
-			BinaryArray array2 = row2.getArray(0);
+			BinaryArray array2 = (BinaryArray) row2.getArray(0);
 			assertEquals(array2, array);
 			assertEquals(array2.getInt(0), 6);
 			assertTrue(array2.isNullAt(1));
@@ -304,7 +308,7 @@ public class BinaryArrayTest {
 			BinaryArray array = new BinaryArray();
 			BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 			writer.setNullAt(0);
-			writer.writeArray(1, subArray);
+			writer.writeArray(1, subArray, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
 			writer.complete();
 
 			assertTrue(array.isNullAt(0));
@@ -320,7 +324,8 @@ public class BinaryArrayTest {
 			BinaryArray array = new BinaryArray();
 			BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
 			writer.setNullAt(0);
-			writer.writeMap(1, BinaryMap.valueOf(subArray, subArray));
+			writer.writeMap(1, BinaryMap.valueOf(subArray, subArray),
+					new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
 			writer.complete();
 
 			assertTrue(array.isNullAt(0));
@@ -352,10 +357,11 @@ public class BinaryArrayTest {
 
 		BinaryRow row = new BinaryRow(1);
 		BinaryRowWriter rowWriter = new BinaryRowWriter(row);
-		rowWriter.writeMap(0, binaryMap);
+		rowWriter.writeMap(0, binaryMap,
+				new BaseMapSerializer(DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()));
 		rowWriter.complete();
 
-		BinaryMap map = row.getMap(0);
+		BinaryMap map = (BinaryMap) row.getMap(0);
 		BinaryArray key = map.keyArray();
 		BinaryArray value = map.valueArray();
 
@@ -468,7 +474,8 @@ public class BinaryArrayTest {
 	public void testNested() {
 		BinaryArray array = new BinaryArray();
 		BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
-		writer.writeRow(0, GenericRow.of(fromString("1"), 1), RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
+		writer.writeRow(0, GenericRow.of(fromString("1"), 1),
+				new BaseRowSerializer(null, RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType())));
 		writer.setNullAt(1);
 		writer.complete();
 
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
index 0be17dd..b4a6387 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.typeutils.BaseRowSerializer;
 import org.apache.flink.table.typeutils.BinaryRowSerializer;
 
 import org.junit.Assert;
@@ -431,7 +432,8 @@ public class BinaryRowTest {
 	public void testNested() {
 		BinaryRow row = new BinaryRow(2);
 		BinaryRowWriter writer = new BinaryRowWriter(row);
-		writer.writeRow(0, GenericRow.of(fromString("1"), 1), RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()));
+		writer.writeRow(0, GenericRow.of(fromString("1"), 1),
+				new BaseRowSerializer(null, RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType())));
 		writer.setNullAt(1);
 		writer.complete();
 
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java
index 95c3549..c76b3e5 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/BinaryRowKeySelector.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.table.runtime.util;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.dataformat.BinaryRowWriter;
 import org.apache.flink.table.dataformat.BinaryWriter;
 import org.apache.flink.table.dataformat.TypeGetterSetters;
 import org.apache.flink.table.runtime.keyselector.BaseRowKeySelector;
+import org.apache.flink.table.types.InternalSerializers;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.typeutils.BaseRowTypeInfo;
 
@@ -37,13 +40,17 @@ public class BinaryRowKeySelector implements BaseRowKeySelector {
 	private final int[] keyFields;
 	private final LogicalType[] inputFieldTypes;
 	private final LogicalType[] keyFieldTypes;
+	private final TypeSerializer[] keySers;
 
 	public BinaryRowKeySelector(int[] keyFields, LogicalType[] inputFieldTypes) {
 		this.keyFields = keyFields;
 		this.inputFieldTypes = inputFieldTypes;
 		this.keyFieldTypes = new LogicalType[keyFields.length];
+		this.keySers = new TypeSerializer[keyFields.length];
+		ExecutionConfig conf = new ExecutionConfig();
 		for (int i = 0; i < keyFields.length; ++i) {
 			keyFieldTypes[i] = inputFieldTypes[keyFields[i]];
+			keySers[i] = InternalSerializers.create(keyFieldTypes[i], conf);
 		}
 	}
 
@@ -59,7 +66,8 @@ public class BinaryRowKeySelector implements BaseRowKeySelector {
 						writer,
 						i,
 						TypeGetterSetters.get(value, keyFields[i], inputFieldTypes[keyFields[i]]),
-						inputFieldTypes[keyFields[i]]);
+						inputFieldTypes[keyFields[i]],
+						keySers[i]);
 			}
 		}
 		writer.complete();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryArraySerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
similarity index 53%
rename from flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryArraySerializerTest.java
rename to flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
index d1e5b35..feb9893 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryArraySerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseArraySerializerTest.java
@@ -18,19 +18,49 @@
 
 package org.apache.flink.table.typeutils;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.dataformat.BaseArray;
 import org.apache.flink.table.dataformat.BinaryArray;
 import org.apache.flink.table.dataformat.BinaryArrayWriter;
 import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.dataformat.GenericArray;
+import org.apache.flink.testutils.DeeplyEqualsChecker;
 
 /**
- * A test for the {@link BinaryArraySerializer}.
+ * A test for the {@link BaseArraySerializer}.
  */
-public class BinaryArraySerializerTest extends SerializerTestBase<BinaryArray> {
+public class BaseArraySerializerTest extends SerializerTestBase<BaseArray> {
+
+	public BaseArraySerializerTest() {
+		super(new DeeplyEqualsChecker().withCustomCheck(
+				(o1, o2) -> o1 instanceof BaseArray && o2 instanceof BaseArray,
+				(o1, o2, checker) -> {
+					BaseArray array1 = (BaseArray) o1;
+					BaseArray array2 = (BaseArray) o2;
+					if (array1.numElements() != array2.numElements()) {
+						return false;
+					}
+					for (int i = 0; i < array1.numElements(); i++) {
+						if (!array1.isNullAt(i) || !array2.isNullAt(i)) {
+							if (array1.isNullAt(i) || array2.isNullAt(i)) {
+								return false;
+							} else {
+								if (!array1.getString(i).equals(array2.getString(i))) {
+									return false;
+								}
+							}
+						}
+					}
+					return true;
+				}
+		));
+	}
 
 	@Override
-	protected BinaryArraySerializer createSerializer() {
-		return BinaryArraySerializer.INSTANCE;
+	protected BaseArraySerializer createSerializer() {
+		return new BaseArraySerializer(DataTypes.STRING().getLogicalType(), new ExecutionConfig());
 	}
 
 	@Override
@@ -39,14 +69,14 @@ public class BinaryArraySerializerTest extends SerializerTestBase<BinaryArray> {
 	}
 
 	@Override
-	protected Class<BinaryArray> getTypeClass() {
-		return BinaryArray.class;
+	protected Class<BaseArray> getTypeClass() {
+		return BaseArray.class;
 	}
 
 	@Override
-	protected BinaryArray[] getTestData() {
-		return new BinaryArray[] {
-				createArray("11"),
+	protected BaseArray[] getTestData() {
+		return new BaseArray[] {
+				new GenericArray(new BinaryString[] {BinaryString.fromString("11")}, 1),
 				createArray("11", "haa"),
 				createArray("11", "haa", "ke"),
 				createArray("11", "haa", "ke"),
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
new file mode 100644
index 0000000..38216dc
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseMapSerializerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.dataformat.BaseMap;
+import org.apache.flink.table.dataformat.BinaryArray;
+import org.apache.flink.table.dataformat.BinaryArrayWriter;
+import org.apache.flink.table.dataformat.BinaryMap;
+import org.apache.flink.table.dataformat.BinaryString;
+import org.apache.flink.table.dataformat.GenericMap;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.testutils.DeeplyEqualsChecker;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A test for the {@link BaseMapSerializer}.
+ */
+public class BaseMapSerializerTest extends SerializerTestBase<BaseMap> {
+
+	private static final LogicalType INT = DataTypes.INT().getLogicalType();
+	private static final LogicalType STRING = DataTypes.STRING().getLogicalType();
+
+	public BaseMapSerializerTest() {
+		super(new DeeplyEqualsChecker().withCustomCheck(
+				(o1, o2) -> o1 instanceof BaseMap && o2 instanceof BaseMap,
+				(o1, o2, checker) ->
+						// Better is more proper to compare the maps after changing them to Java maps
+						// instead of binary maps. For example, consider the following two maps:
+						// {1: 'a', 2: 'b', 3: 'c'} and {3: 'c', 2: 'b', 1: 'a'}
+						// These are actually the same maps, but their key / value order will be
+						// different when stored as binary maps, and the equalsTo method of binary
+						// map will return false.
+						((BaseMap) o1).toJavaMap(INT, STRING)
+								.equals(((BaseMap) o2).toJavaMap(INT, STRING))
+		));
+	}
+
+	private static BaseMapSerializer newSer() {
+		return new BaseMapSerializer(INT, STRING);
+	}
+
+	@Override
+	protected BaseMapSerializer createSerializer() {
+		return newSer();
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<BaseMap> getTypeClass() {
+		return BaseMap.class;
+	}
+
+	@Override
+	protected BaseMap[] getTestData() {
+		Map<Object, Object> first = new HashMap<>();
+		first.put(1, BinaryString.fromString(""));
+		return new BaseMap[] {
+				new GenericMap(first),
+				BinaryMap.valueOf(createArray(1, 2), BaseArraySerializerTest.createArray("11", "haa")),
+				BinaryMap.valueOf(createArray(1, 3, 4), BaseArraySerializerTest.createArray("11", "haa", "ke")),
+				BinaryMap.valueOf(createArray(1, 4, 2), BaseArraySerializerTest.createArray("11", "haa", "ke")),
+				BinaryMap.valueOf(createArray(1, 5, 6, 7), BaseArraySerializerTest.createArray("11", "lele", "haa", "ke"))
+		};
+	}
+
+	private static BinaryArray createArray(int... vs) {
+		BinaryArray array = new BinaryArray();
+		BinaryArrayWriter writer = new BinaryArrayWriter(array, vs.length, 4);
+		for (int i = 0; i < vs.length; i++) {
+			writer.writeInt(i, vs[i]);
+		}
+		writer.complete();
+		return array;
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseRowSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseRowSerializerTest.java
index 9295973..c392c19 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseRowSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BaseRowSerializerTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.testutils.DeeplyEqualsChecker;
 
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -62,7 +63,9 @@ public class BaseRowSerializerTest extends SerializerTestInstance<BaseRow> {
 			new DeeplyEqualsChecker()
 				.withCustomCheck(
 					(o1, o2) -> o1 instanceof BaseRow && o2 instanceof BaseRow,
-					(o1, o2, checker) -> deepEqualsBaseRow((BaseRow) o1, (BaseRow) o2, serializer)
+						(o1, o2, checker) -> deepEqualsBaseRow((BaseRow) o1, (BaseRow) o2,
+								(BaseRowSerializer) serializer.duplicate(),
+								(BaseRowSerializer) serializer.duplicate())
 				),
 			serializer,
 			BaseRow.class,
@@ -182,39 +185,44 @@ public class BaseRowSerializerTest extends SerializerTestInstance<BaseRow> {
 		return row;
 	}
 
-	private static boolean deepEqualsBaseRow(BaseRow should, BaseRow is, BaseRowSerializer serializer) {
+	private static boolean deepEqualsBaseRow(BaseRow should, BaseRow is,
+			BaseRowSerializer serializer1, BaseRowSerializer serializer2) {
 		if (should.getArity() != is.getArity()) {
 			return false;
 		}
+		BinaryRow row1 = serializer1.toBinaryRow(should);
+		BinaryRow row2 = serializer2.toBinaryRow(is);
 
-		return Objects.equals(serializer.baseRowToBinary(should), serializer.baseRowToBinary(is));
+		return Objects.equals(row1, row2);
 	}
 
-	private boolean deepEquals(BaseRow should, BaseRow is) {
-		return deepEqualsBaseRow(should, is, serializer);
+	private void checkDeepEquals(BaseRow should, BaseRow is) {
+		boolean equals = deepEqualsBaseRow(should, is,
+				(BaseRowSerializer) serializer.duplicate(), (BaseRowSerializer) serializer.duplicate());
+		Assert.assertTrue(equals);
 	}
 
 	@Test
 	public void testCopy() {
 		for (BaseRow row : testData) {
-			deepEquals(row, serializer.copy(row));
+			checkDeepEquals(row, serializer.copy(row));
 		}
 
 		for (BaseRow row : testData) {
-			deepEquals(row, serializer.copy(row, new GenericRow(row.getArity())));
+			checkDeepEquals(row, serializer.copy(row, new GenericRow(row.getArity())));
 		}
 
 		for (BaseRow row : testData) {
-			deepEquals(row, serializer.copy(serializer.baseRowToBinary(row),
+			checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row),
 					new GenericRow(row.getArity())));
 		}
 
 		for (BaseRow row : testData) {
-			deepEquals(row, serializer.copy(serializer.baseRowToBinary(row)));
+			checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row)));
 		}
 
 		for (BaseRow row : testData) {
-			deepEquals(row, serializer.copy(serializer.baseRowToBinary(row),
+			checkDeepEquals(row, serializer.copy(serializer.toBinaryRow(row),
 					new BinaryRow(row.getArity())));
 		}
 	}
@@ -229,7 +237,7 @@ public class BaseRowSerializerTest extends SerializerTestInstance<BaseRow> {
 	public void testWrongCopyReuse() {
 		thrown.expect(IllegalArgumentException.class);
 		for (BaseRow row : testData) {
-			deepEquals(row, serializer.copy(row, new GenericRow(row.getArity() + 1)));
+			checkDeepEquals(row, serializer.copy(row, new GenericRow(row.getArity() + 1)));
 		}
 	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryGenericSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryGenericSerializerTest.java
index dc51eb8..f8fc22b 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryGenericSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryGenericSerializerTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.table.dataformat.BinaryGeneric;
 
 /**
- * A test for the {@link BinaryArraySerializer}.
+ * A test for the {@link BinaryGenericSerializer}.
  */
 public class BinaryGenericSerializerTest extends SerializerTestBase<BinaryGeneric<String>> {
 
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryMapSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryMapSerializerTest.java
deleted file mode 100644
index fe6b4e5..0000000
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryMapSerializerTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils;
-
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.table.dataformat.BinaryArray;
-import org.apache.flink.table.dataformat.BinaryArrayWriter;
-import org.apache.flink.table.dataformat.BinaryMap;
-
-/**
- * A test for the {@link BinaryArraySerializer}.
- */
-public class BinaryMapSerializerTest extends SerializerTestBase<BinaryMap> {
-
-	@Override
-	protected BinaryMapSerializer createSerializer() {
-		return BinaryMapSerializer.INSTANCE;
-	}
-
-	@Override
-	protected int getLength() {
-		return -1;
-	}
-
-	@Override
-	protected Class<BinaryMap> getTypeClass() {
-		return BinaryMap.class;
-	}
-
-	@Override
-	protected BinaryMap[] getTestData() {
-		return new BinaryMap[] {
-				BinaryMap.valueOf(createArray(1), BinaryArraySerializerTest.createArray("11")),
-				BinaryMap.valueOf(createArray(1, 2), BinaryArraySerializerTest.createArray("11", "haa")),
-				BinaryMap.valueOf(createArray(1, 3, 4), BinaryArraySerializerTest.createArray("11", "haa", "ke")),
-				BinaryMap.valueOf(createArray(1, 4, 2), BinaryArraySerializerTest.createArray("11", "haa", "ke")),
-				BinaryMap.valueOf(createArray(1, 5, 6, 7), BinaryArraySerializerTest.createArray("11", "lele", "haa", "ke"))
-		};
-	}
-
-	private static BinaryArray createArray(int... vs) {
-		BinaryArray array = new BinaryArray();
-		BinaryArrayWriter writer = new BinaryArrayWriter(array, vs.length, 8);
-		for (int i = 0; i < vs.length; i++) {
-			writer.writeInt(i, vs[i]);
-		}
-		writer.complete();
-		return array;
-	}
-}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java
index 11f3ea3..f7b1f9e 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/BinaryRowSerializerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.dataformat.BinaryRowWriter;
 import org.apache.flink.table.dataformat.BinaryString;
 
 /**
- * A test for the {@link BinaryArraySerializer}.
+ * A test for the {@link BinaryRowSerializer}.
  */
 public class BinaryRowSerializerTest extends SerializerTestBase<BinaryRow> {
 
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java
index 8fdb7b0..9f9a477 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/typeutils/DecimalSerializerTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.table.dataformat.Decimal;
 
 /**
- * A test for the {@link BinaryArraySerializer}.
+ * A test for the {@link DecimalSerializer}.
  */
 public class DecimalSerializerTest extends SerializerTestBase<Decimal> {