You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/11/07 10:19:09 UTC

[flink] 04/04: [FLINK-13702][table-planner-blink] Fixed BinaryGeneric & BinaryString materialization

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 814b5fd35fd0308f6edeedb8414623a43ba512a7
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Nov 1 15:08:38 2019 +0100

    [FLINK-13702][table-planner-blink] Fixed BinaryGeneric & BinaryString materialization
---
 .../FirstValueWithRetractAggFunction.java          |   6 +-
 .../LastValueWithRetractAggFunction.java           |   4 +-
 .../flink/table/planner/codegen/CodeGenUtils.scala |   5 +-
 .../planner/codegen/SortCodeGeneratorTest.java     |  16 +-
 .../table/dataformat/AbstractBinaryWriter.java     |  31 +--
 .../apache/flink/table/dataformat/BinaryArray.java |   6 +-
 .../flink/table/dataformat/BinaryFormat.java       |  87 ++------
 .../flink/table/dataformat/BinaryGeneric.java      |  49 ++---
 .../apache/flink/table/dataformat/BinaryMap.java   |  12 +-
 .../apache/flink/table/dataformat/BinaryRow.java   |   8 +-
 .../flink/table/dataformat/BinarySection.java      |  74 +++++++
 .../flink/table/dataformat/BinaryString.java       | 239 ++++++++++++---------
 .../flink/table/dataformat/BinaryStringUtil.java   | 119 +++++-----
 .../flink/table/dataformat/BinaryWriter.java       |   5 +-
 .../table/dataformat/DataFormatConverters.java     |   2 +-
 .../flink/table/dataformat/LazyBinaryFormat.java   |  73 ++++---
 .../apache/flink/table/dataformat/NestedRow.java   |   5 +-
 .../runtime/typeutils/BinaryGenericSerializer.java |  76 +++----
 .../apache/flink/table/dataformat/BaseRowTest.java |  11 +-
 .../flink/table/dataformat/BinaryArrayTest.java    |  10 +-
 .../flink/table/dataformat/BinaryRowTest.java      |  63 ++++--
 .../flink/table/dataformat/BinaryStringTest.java   |  17 +-
 .../flink/table/dataformat/NestedRowTest.java      |  10 +-
 .../runtime/typeutils/BaseArraySerializerTest.java |   2 +-
 .../runtime/typeutils/BaseMapSerializerTest.java   |   2 +-
 .../typeutils/BinaryGenericSerializerTest.java     |  21 +-
 .../flink/table/utils/BinaryGenericAsserter.java   |  78 +++++++
 27 files changed, 607 insertions(+), 424 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueWithRetractAggFunction.java
index 7bcbbc2..a81d1d8 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueWithRetractAggFunction.java
@@ -70,11 +70,9 @@ public abstract class FirstValueWithRetractAggFunction<T> extends AggregateFunct
 		acc.setField(0, null);
 		acc.setField(1, null);
 		acc.setField(2, new BinaryGeneric<>(
-			new MapView<>(getResultType(), new ListTypeInfo<>(Types.LONG)),
-			getValueToOrderMapViewSerializer()));
+			new MapView<>(getResultType(), new ListTypeInfo<>(Types.LONG))));
 		acc.setField(3, new BinaryGeneric<>(
-			new MapView<>(Types.LONG, new ListTypeInfo<>(getResultType())),
-			getOrderToValueMapViewSerializer()));
+			new MapView<>(Types.LONG, new ListTypeInfo<>(getResultType()))));
 		return acc;
 	}
 
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LastValueWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LastValueWithRetractAggFunction.java
index 97f3c99..cd250a4 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LastValueWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LastValueWithRetractAggFunction.java
@@ -70,9 +70,9 @@ public abstract class LastValueWithRetractAggFunction<T> extends AggregateFuncti
 		acc.setField(0, null);
 		acc.setField(1, null);
 		acc.setField(2, new BinaryGeneric<>(
-				new MapView<>(getResultType(), new ListTypeInfo<>(Types.LONG)), getValueToOrderMapViewSerializer()));
+				new MapView<>(getResultType(), new ListTypeInfo<>(Types.LONG))));
 		acc.setField(3, new BinaryGeneric<>(
-				new MapView<>(Types.LONG, new ListTypeInfo<>(getResultType())), getOrderToValueMapViewSerializer()));
+				new MapView<>(Types.LONG, new ListTypeInfo<>(getResultType()))));
 		return acc;
 	}
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index da859ef..560c6a8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -653,8 +653,9 @@ object CodeGenUtils {
       case ROW =>
         val ser = ctx.addReusableTypeSerializer(t)
         s"$writerTerm.writeRow($indexTerm, $fieldValTerm, $ser)"
-
-      case ANY => s"$writerTerm.writeGeneric($indexTerm, $fieldValTerm)"
+      case ANY =>
+        val ser = ctx.addReusableTypeSerializer(t)
+        s"$writerTerm.writeGeneric($indexTerm, $fieldValTerm, $ser)"
     }
 
   private def isConverterIdentity(t: DataType): Boolean = {
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
index c055078..5e1aeaf 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
 import org.apache.flink.table.runtime.types.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.AbstractRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
 import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
@@ -82,6 +83,8 @@ import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
+import static org.apache.flink.table.utils.BinaryGenericAsserter.equivalent;
+import static org.junit.Assert.assertThat;
 
 /**
  * Random test for sort code generator.
@@ -262,7 +265,7 @@ public class SortCodeGeneratorTest {
 					}
 					break;
 				case ANY:
-					seeds[i] = new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE);
+					seeds[i] = new BinaryGeneric<>(rnd.nextInt());
 					break;
 				default:
 					throw new RuntimeException("Not support!");
@@ -314,7 +317,7 @@ public class SortCodeGeneratorTest {
 			case ROW:
 				return GenericRow.of(new Object[]{null});
 			case ANY:
-				return new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE);
+				return new BinaryGeneric<>(rnd.nextInt());
 			default:
 				throw new RuntimeException("Not support!");
 		}
@@ -355,7 +358,7 @@ public class SortCodeGeneratorTest {
 					return GenericRow.of(GenericRow.of(new Object[]{null}));
 				}
 			case ANY:
-				return new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE);
+				return new BinaryGeneric<>(rnd.nextInt());
 			default:
 				throw new RuntimeException("Not support!");
 		}
@@ -396,7 +399,7 @@ public class SortCodeGeneratorTest {
 					return GenericRow.of(GenericRow.of(rnd.nextInt()));
 				}
 			case ANY:
-				return new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE);
+				return new BinaryGeneric<>(rnd.nextInt());
 			default:
 				throw new RuntimeException("Not support!");
 		}
@@ -562,6 +565,11 @@ public class SortCodeGeneratorTest {
 					Object o2 = TypeGetterSetters.get(result.get(i), keys[j], keyTypes[j]);
 					if (keyTypes[j] instanceof VarBinaryType) {
 						Assert.assertArrayEquals(msg, (byte[]) o1, (byte[]) o2);
+					} else if (keyTypes[j] instanceof TypeInformationAnyType) {
+						assertThat(
+							msg,
+							(BinaryGeneric) o1,
+							equivalent((BinaryGeneric) o2, new BinaryGenericSerializer<>(IntSerializer.INSTANCE)));
 					} else {
 						Assert.assertEquals(msg, o1, o2);
 					}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
index 5091c3c..6499528 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
@@ -17,12 +17,14 @@
 
 package org.apache.flink.table.dataformat;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
 import org.apache.flink.table.runtime.typeutils.BaseMapSerializer;
 import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
 import org.apache.flink.table.runtime.util.SegmentsUtil;
 
 import java.io.IOException;
@@ -30,8 +32,6 @@ import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
-import static org.apache.flink.table.dataformat.BinaryFormat.MAX_FIX_PART_DATA_SIZE;
-
 /**
  * Use the special format to write data to a {@link MemorySegment} (its capacity grows
  * automatically).
@@ -91,7 +91,7 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
 
 	private void writeBytes(int pos, byte[] bytes) {
 		int len = bytes.length;
-		if (len <= MAX_FIX_PART_DATA_SIZE) {
+		if (len <= BinaryFormat.MAX_FIX_PART_DATA_SIZE) {
 			writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len);
 		} else {
 			writeBytesToVarLenPart(pos, bytes, len);
@@ -118,24 +118,11 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
 	}
 
 	@Override
-	public void writeGeneric(int pos, BinaryGeneric input) {
-		if (input.getSegments() == null) {
-			int beforeCursor = cursor;
-			try {
-				input.getJavaObjectSerializer().serialize(input.getJavaObject(), getOutputView());
-			} catch (IOException e) {
-				throw new RuntimeException(e);
-			}
-			int size = cursor - beforeCursor;
-			final int roundedSize = roundNumberOfBytesToNearestWord(size);
-			int paddingBytes = roundedSize - size;
-			ensureCapacity(paddingBytes);
-			setOffsetAndSize(pos, beforeCursor, size);
-			zeroBytes(cursor, paddingBytes);
-			cursor += paddingBytes;
-		} else {
-			writeSegmentsToVarLenPart(pos, input.getSegments(), input.getOffset(), input.getSizeInBytes());
-		}
+	@SuppressWarnings("unchecked")
+	public void writeGeneric(int pos, BinaryGeneric input, BinaryGenericSerializer serializer) {
+		TypeSerializer innerSerializer = serializer.getInnerSerializer();
+		input.ensureMaterialized(innerSerializer);
+		writeSegmentsToVarLenPart(pos, input.getSegments(), input.getOffset(), input.getSizeInBytes());
 	}
 
 	@Override
@@ -152,7 +139,7 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
 	@Override
 	public void writeBinary(int pos, byte[] bytes) {
 		int len = bytes.length;
-		if (len <= MAX_FIX_PART_DATA_SIZE) {
+		if (len <= BinaryFormat.MAX_FIX_PART_DATA_SIZE) {
 			writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len);
 		} else {
 			writeBytesToVarLenPart(pos, bytes, len);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
index acf5adc..523d96a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
@@ -35,7 +35,7 @@ import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
  *
  * <p>{@code BinaryArray} are influenced by Apache Spark UnsafeArrayData.
  */
-public final class BinaryArray extends BinaryFormat implements BaseArray {
+public final class BinaryArray extends BinarySection implements BaseArray {
 
 	/**
 	 * Offset for Arrays.
@@ -174,7 +174,7 @@ public final class BinaryArray extends BinaryFormat implements BaseArray {
 		assertIndexIsValid(pos);
 		int fieldOffset = getElementOffset(pos, 8);
 		final long offsetAndSize = SegmentsUtil.getLong(segments, fieldOffset);
-		return BinaryString.readBinaryStringFieldFromSegments(
+		return BinaryFormat.readBinaryStringFieldFromSegments(
 				segments, offset, fieldOffset, offsetAndSize);
 	}
 
@@ -205,7 +205,7 @@ public final class BinaryArray extends BinaryFormat implements BaseArray {
 		assertIndexIsValid(pos);
 		int fieldOffset = getElementOffset(pos, 8);
 		final long offsetAndSize = SegmentsUtil.getLong(segments, fieldOffset);
-		return readBinaryFieldFromSegments(
+		return BinaryFormat.readBinaryFieldFromSegments(
 				segments, offset, fieldOffset, offsetAndSize);
 	}
 
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
index 2a6ccc4..a95ac03 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.	See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.	You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *		http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,10 +22,9 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.table.runtime.util.SegmentsUtil;
 
 /**
- * Binary format that in {@link MemorySegment}s.
+ * Binary format spanning {@link MemorySegment}s.
  */
-public abstract class BinaryFormat {
-
+public interface BinaryFormat {
 	/**
 	 * It decides whether to put data in FixLenPart or VarLenPart. See more in {@link BinaryRow}.
 	 *
@@ -36,8 +36,7 @@ public abstract class BinaryFormat {
 	 * 1-bit mark(1) = 0, 31-bits offset to the data, and 4-bytes length of data.
 	 * Data is stored in variable-length part.
 	 */
-	static final int MAX_FIX_PART_DATA_SIZE = 7;
-
+	int MAX_FIX_PART_DATA_SIZE = 7;
 	/**
 	 * To get the mark in highest bit of long.
 	 * Form: 10000000 00000000 ... (8 bytes)
@@ -45,8 +44,7 @@ public abstract class BinaryFormat {
 	 * <p>This is used to decide whether the data is stored in fixed-length part or variable-length
 	 * part. see {@link #MAX_FIX_PART_DATA_SIZE} for more information.
 	 */
-	private static final long HIGHEST_FIRST_BIT = 0x80L << 56;
-
+	long HIGHEST_FIRST_BIT = 0x80L << 56;
 	/**
 	 * To get the 7 bits length in second bit to eighth bit out of a long.
 	 * Form: 01111111 00000000 ... (8 bytes)
@@ -54,58 +52,13 @@ public abstract class BinaryFormat {
 	 * <p>This is used to get the length of the data which is stored in this long.
 	 * see {@link #MAX_FIX_PART_DATA_SIZE} for more information.
 	 */
-	private static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
-
-	protected MemorySegment[] segments;
-	protected int offset;
-	protected int sizeInBytes;
-
-	public BinaryFormat() {}
-
-	public BinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes) {
-		this.segments = segments;
-		this.offset = offset;
-		this.sizeInBytes = sizeInBytes;
-	}
-
-	public final void pointTo(MemorySegment segment, int offset, int sizeInBytes) {
-		pointTo(new MemorySegment[] {segment}, offset, sizeInBytes);
-	}
-
-	public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
-		this.segments = segments;
-		this.offset = offset;
-		this.sizeInBytes = sizeInBytes;
-	}
-
-	public MemorySegment[] getSegments() {
-		return segments;
-	}
-
-	public int getOffset() {
-		return offset;
-	}
+	long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
 
-	public int getSizeInBytes() {
-		return sizeInBytes;
-	}
+	MemorySegment[] getSegments();
 
-	@Override
-	public boolean equals(Object o) {
-		return this == o || o != null &&
-				getClass() == o.getClass() &&
-				binaryEquals((BinaryFormat) o);
-	}
+	int getOffset();
 
-	protected boolean binaryEquals(BinaryFormat that) {
-		return sizeInBytes == that.sizeInBytes &&
-				SegmentsUtil.equals(segments, offset, that.segments, that.offset, sizeInBytes);
-	}
-
-	@Override
-	public int hashCode() {
-		return SegmentsUtil.hash(segments, offset, sizeInBytes);
-	}
+	int getSizeInBytes();
 
 	/**
 	 * Get binary, if len less than 8, will be include in variablePartOffsetAndLen.
@@ -117,7 +70,9 @@ public abstract class BinaryFormat {
 	 * @param variablePartOffsetAndLen a long value, real data or offset and len.
 	 */
 	static byte[] readBinaryFieldFromSegments(
-			MemorySegment[] segments, int baseOffset, int fieldOffset,
+			MemorySegment[] segments,
+			int baseOffset,
+			int fieldOffset,
 			long variablePartOffsetAndLen) {
 		long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
 		if (mark == 0) {
@@ -145,7 +100,9 @@ public abstract class BinaryFormat {
 	 * @param variablePartOffsetAndLen a long value, real data or offset and len.
 	 */
 	static BinaryString readBinaryStringFieldFromSegments(
-			MemorySegment[] segments, int baseOffset, int fieldOffset,
+			MemorySegment[] segments,
+			int baseOffset,
+			int fieldOffset,
 			long variablePartOffsetAndLen) {
 		long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
 		if (mark == 0) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
index 390e43b..5200f36 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
@@ -31,48 +31,28 @@ import java.io.IOException;
  */
 public final class BinaryGeneric<T> extends LazyBinaryFormat<T> {
 
-	private final TypeSerializer<T> javaObjectSer;
-
-	public BinaryGeneric(T javaObject, TypeSerializer<T> javaObjectSer) {
+	public BinaryGeneric(T javaObject) {
 		super(javaObject);
-		this.javaObjectSer = javaObjectSer;
 	}
 
-	public BinaryGeneric(MemorySegment[] segments, int offset, int sizeInBytes,
-			TypeSerializer<T> javaObjectSer) {
+	public BinaryGeneric(MemorySegment[] segments, int offset, int sizeInBytes) {
 		super(segments, offset, sizeInBytes);
-		this.javaObjectSer = javaObjectSer;
 	}
 
-	public BinaryGeneric(MemorySegment[] segments, int offset, int sizeInBytes, T javaObject,
-			TypeSerializer<T> javaObjectSer) {
+	public BinaryGeneric(MemorySegment[] segments, int offset, int sizeInBytes, T javaObject) {
 		super(segments, offset, sizeInBytes, javaObject);
-		this.javaObjectSer = javaObjectSer;
-	}
-
-	public TypeSerializer<T> getJavaObjectSerializer() {
-		return javaObjectSer;
 	}
 
 	@Override
-	public void materialize() {
+	protected BinarySection materialize(TypeSerializer<T> serializer) {
 		try {
-			byte[] bytes = InstantiationUtil.serializeToByteArray(javaObjectSer, javaObject);
-			pointTo(new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
+			byte[] bytes = InstantiationUtil.serializeToByteArray(serializer, javaObject);
+			return new BinarySection(new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
 		} catch (IOException e) {
 			throw new RuntimeException(e);
 		}
 	}
 
-	public BinaryGeneric<T> copy() {
-		ensureMaterialized();
-		byte[] bytes = SegmentsUtil.copyToBytes(segments, offset, sizeInBytes);
-		T newJavaObject = javaObject == null ? null : javaObjectSer.copy(javaObject);
-		return new BinaryGeneric<>(new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, sizeInBytes,
-				newJavaObject,
-				javaObjectSer);
-	}
-
 	static <T> BinaryGeneric<T> readBinaryGenericFieldFromSegments(
 			MemorySegment[] segments, int baseOffset, long offsetAndSize) {
 		final int size = ((int) offsetAndSize);
@@ -91,4 +71,21 @@ public final class BinaryGeneric<T> extends LazyBinaryFormat<T> {
 		}
 		return value.getJavaObject();
 	}
+
+	@Override
+	public boolean equals(Object o) {
+		throw new UnsupportedOperationException("BinaryGeneric cannot be compared");
+	}
+
+	@Override
+	public int hashCode() {
+		throw new UnsupportedOperationException("BinaryGeneric does not have a hashCode");
+	}
+
+	@Override
+	public String toString() {
+		return "BinaryGeneric{" +
+			"javaObject=" + javaObject +
+			'}';
+	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
index 8660fee..679cff9 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
@@ -34,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  *
  * <p>{@code BinaryMap} are influenced by Apache Spark UnsafeMapData.
  */
-public final class BinaryMap extends BinaryFormat implements BaseMap {
+public final class BinaryMap extends BinarySection implements BaseMap {
 
 	private final BinaryArray keys;
 	private final BinaryArray values;
@@ -102,13 +102,13 @@ public final class BinaryMap extends BinaryFormat implements BaseMap {
 	}
 
 	public static BinaryMap valueOf(BinaryArray key, BinaryArray value) {
-		checkArgument(key.getSegments().length == 1 && value.getSegments().length == 1);
-		byte[] bytes = new byte[4 + key.getSizeInBytes() + value.getSizeInBytes()];
+		checkArgument(key.segments.length == 1 && value.getSegments().length == 1);
+		byte[] bytes = new byte[4 + key.sizeInBytes + value.sizeInBytes];
 		MemorySegment segment = MemorySegmentFactory.wrap(bytes);
-		segment.putInt(0, key.getSizeInBytes());
-		key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.getSizeInBytes());
+		segment.putInt(0, key.sizeInBytes);
+		key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.sizeInBytes);
 		value.getSegments()[0].copyTo(
-				value.getOffset(), segment, 4 + key.getSizeInBytes(), value.getSizeInBytes());
+				value.getOffset(), segment, 4 + key.sizeInBytes, value.sizeInBytes);
 		BinaryMap map = new BinaryMap();
 		map.pointTo(segment, 0, bytes.length);
 		return map;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
index bc98c43..75f7e24 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
@@ -52,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * The difference is that BinaryRow is placed on a discontinuous memory, and the variable length
  * type can also be placed on a fixed length area (If it's short enough).
  */
-public final class BinaryRow extends BinaryFormat implements BaseRow {
+public final class BinaryRow extends BinarySection implements BaseRow {
 
 	public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
 	private static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? ~0xFFL : ~(0xFFL << 56L);
@@ -287,7 +287,7 @@ public final class BinaryRow extends BinaryFormat implements BaseRow {
 		assertIndexIsValid(pos);
 		int fieldOffset = getFieldOffset(pos);
 		final long offsetAndLen = segments[0].getLong(fieldOffset);
-		return BinaryString.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
+		return BinaryFormat.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
 	}
 
 	@Override
@@ -315,7 +315,7 @@ public final class BinaryRow extends BinaryFormat implements BaseRow {
 		assertIndexIsValid(pos);
 		int fieldOffset = getFieldOffset(pos);
 		final long offsetAndLen = segments[0].getLong(fieldOffset);
-		return readBinaryFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
+		return BinaryFormat.readBinaryFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
 	}
 
 	@Override
@@ -411,7 +411,7 @@ public final class BinaryRow extends BinaryFormat implements BaseRow {
 	}
 
 	private boolean equalsFrom(Object o, int startIndex) {
-		if (o != null && o instanceof BinaryRow) {
+		if (o instanceof BinaryRow) {
 			BinaryRow other = (BinaryRow) o;
 			return sizeInBytes == other.sizeInBytes &&
 					SegmentsUtil.equals(
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinarySection.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinarySection.java
new file mode 100644
index 0000000..afde68b
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinarySection.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.	See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.	You may obtain a copy of the License at
+ *
+ *		http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.runtime.util.SegmentsUtil;
+
+/**
+ * Binary section of memory.
+ */
+public class BinarySection implements BinaryFormat {
+
+	protected MemorySegment[] segments;
+	protected int offset;
+	protected int sizeInBytes;
+
+	public BinarySection() {}
+
+	public BinarySection(MemorySegment[] segments, int offset, int sizeInBytes) {
+		this.segments = segments;
+		this.offset = offset;
+		this.sizeInBytes = sizeInBytes;
+	}
+
+	public final void pointTo(MemorySegment segment, int offset, int sizeInBytes) {
+		pointTo(new MemorySegment[] {segment}, offset, sizeInBytes);
+	}
+
+	public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
+		this.segments = segments;
+		this.offset = offset;
+		this.sizeInBytes = sizeInBytes;
+	}
+
+	public MemorySegment[] getSegments() {
+		return segments;
+	}
+
+	public int getOffset() {
+		return offset;
+	}
+
+	public int getSizeInBytes() {
+		return sizeInBytes;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		return this == o || o != null &&
+			getClass() == o.getClass() &&
+			sizeInBytes == ((BinarySection) o).sizeInBytes &&
+			SegmentsUtil.equals(segments, offset, ((BinarySection) o).segments, ((BinarySection) o).offset, sizeInBytes);
+	}
+
+	@Override
+	public int hashCode() {
+		return SegmentsUtil.hash(segments, offset, sizeInBytes);
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
index f3db1eb..78240a6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
@@ -18,6 +18,7 @@
 package org.apache.flink.table.dataformat;
 
 import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.table.runtime.typeutils.BinaryStringTypeInfoFactory;
@@ -116,7 +117,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 		ensureMaterialized();
 		if (inFirstSegment()) {
 			int len = 0;
-			for (int i = 0; i < sizeInBytes; i += numBytesForFirstByte(getByteOneSegment(i))) {
+			for (int i = 0; i < binarySection.sizeInBytes; i += numBytesForFirstByte(getByteOneSegment(i))) {
 				len++;
 			}
 			return len;
@@ -127,10 +128,10 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 
 	private int numCharsMultiSegs() {
 		int len = 0;
-		int segSize = segments[0].size();
+		int segSize = binarySection.segments[0].size();
 		SegmentAndOffset index = firstSegmentAndOffset(segSize);
 		int i = 0;
-		while (i < sizeInBytes) {
+		while (i < binarySection.sizeInBytes) {
 			int charBytes = numBytesForFirstByte(index.value());
 			i += charBytes;
 			len++;
@@ -141,7 +142,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 
 	/**
 	 * Returns the {@code byte} value at the specified index. An index ranges from {@code 0} to
-	 * {@code getSizeInBytes() - 1}.
+	 * {@code binarySection.sizeInBytes - 1}.
 	 *
 	 * @param      index   the index of the {@code byte} value.
 	 * @return     the {@code byte} value at the specified index of this UTF-8 bytes.
@@ -151,12 +152,12 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 	 */
 	public byte byteAt(int index) {
 		ensureMaterialized();
-		int globalOffset = offset + index;
-		int size = segments[0].size();
+		int globalOffset = binarySection.offset + index;
+		int size = binarySection.segments[0].size();
 		if (globalOffset < size) {
-			return segments[0].get(globalOffset);
+			return binarySection.segments[0].get(globalOffset);
 		} else {
-			return segments[globalOffset / size].get(globalOffset % size);
+			return binarySection.segments[globalOffset / size].get(globalOffset % size);
 		}
 	}
 
@@ -165,12 +166,12 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 	 */
 	public byte[] getBytes() {
 		ensureMaterialized();
-		return SegmentsUtil.getBytes(segments, offset, sizeInBytes);
+		return SegmentsUtil.getBytes(binarySection.segments, binarySection.offset, binarySection.sizeInBytes);
 	}
 
 	@Override
 	public boolean equals(Object o) {
-		if (o != null && o instanceof BinaryString) {
+		if (o instanceof BinaryString) {
 			BinaryString other = (BinaryString) o;
 			if (javaObject != null && other.javaObject != null) {
 				return javaObject.equals(other.javaObject);
@@ -178,28 +179,62 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 
 			ensureMaterialized();
 			other.ensureMaterialized();
-			return binaryEquals(other);
+			return binarySection.equals(other.binarySection);
 		} else {
 			return false;
 		}
 	}
 
 	@Override
+	public int hashCode() {
+		ensureMaterialized();
+		return binarySection.hashCode();
+	}
+
+	@Override
 	public String toString() {
 		if (javaObject == null) {
-			byte[] bytes = SegmentsUtil.allocateReuseBytes(sizeInBytes);
-			SegmentsUtil.copyToBytes(segments, offset, bytes, 0, sizeInBytes);
-			javaObject = StringUtf8Utils.decodeUTF8(bytes, 0, sizeInBytes);
+			byte[] bytes = SegmentsUtil.allocateReuseBytes(binarySection.sizeInBytes);
+			SegmentsUtil.copyToBytes(binarySection.segments, binarySection.offset, bytes, 0, binarySection.sizeInBytes);
+			javaObject = StringUtf8Utils.decodeUTF8(bytes, 0, binarySection.sizeInBytes);
 		}
 		return javaObject;
 	}
 
 	@Override
-	public void materialize() {
+	public MemorySegment[] getSegments() {
+		ensureMaterialized();
+		return super.getSegments();
+	}
+
+	@Override
+	public int getOffset() {
+		ensureMaterialized();
+		return super.getOffset();
+	}
+
+	@Override
+	public int getSizeInBytes() {
+		ensureMaterialized();
+		return super.getSizeInBytes();
+	}
+
+	public void ensureMaterialized() {
+		ensureMaterialized(null);
+	}
+
+	@Override
+	protected BinarySection materialize(TypeSerializer<String> serializer) {
+		if (serializer != null) {
+			throw new IllegalArgumentException("BinaryString does not support custom serializers");
+		}
+
 		byte[] bytes = StringUtf8Utils.encodeUTF8(javaObject);
-		segments = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
-		offset = 0;
-		sizeInBytes = bytes.length;
+		return new BinarySection(
+			new MemorySegment[]{MemorySegmentFactory.wrap(bytes)},
+			0,
+			bytes.length
+		);
 	}
 
 	/**
@@ -207,9 +242,9 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 	 */
 	public BinaryString copy() {
 		ensureMaterialized();
-		byte[] copy = SegmentsUtil.copyToBytes(segments, offset, sizeInBytes);
+		byte[] copy = SegmentsUtil.copyToBytes(binarySection.segments, binarySection.offset, binarySection.sizeInBytes);
 		return new BinaryString(new MemorySegment[] {MemorySegmentFactory.wrap(copy)},
-				0, sizeInBytes, javaObject);
+				0, binarySection.sizeInBytes, javaObject);
 	}
 
 	/**
@@ -227,19 +262,20 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 
 		ensureMaterialized();
 		other.ensureMaterialized();
-		if (segments.length == 1 && other.segments.length == 1) {
+		if (binarySection.segments.length == 1 && other.binarySection.segments.length == 1) {
 
-			int len = Math.min(sizeInBytes, other.sizeInBytes);
-			MemorySegment seg1 = segments[0];
-			MemorySegment seg2 = other.segments[0];
+			int len = Math.min(binarySection.sizeInBytes, other.binarySection.sizeInBytes);
+			MemorySegment seg1 = binarySection.segments[0];
+			MemorySegment seg2 = other.binarySection.segments[0];
 
 			for (int i = 0; i < len; i++) {
-				int res = (seg1.get(offset + i) & 0xFF) - (seg2.get(other.offset + i) & 0xFF);
+				int res =
+					(seg1.get(binarySection.offset + i) & 0xFF) - (seg2.get(other.binarySection.offset + i) & 0xFF);
 				if (res != 0) {
 					return res;
 				}
 			}
-			return sizeInBytes - other.sizeInBytes;
+			return binarySection.sizeInBytes - other.binarySection.sizeInBytes;
 		}
 
 		// if there are multi segments.
@@ -251,20 +287,20 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 	 */
 	private int compareMultiSegments(BinaryString other) {
 
-		if (sizeInBytes == 0 || other.sizeInBytes == 0) {
-			return sizeInBytes - other.sizeInBytes;
+		if (binarySection.sizeInBytes == 0 || other.binarySection.sizeInBytes == 0) {
+			return binarySection.sizeInBytes - other.binarySection.sizeInBytes;
 		}
 
-		int len = Math.min(sizeInBytes, other.sizeInBytes);
+		int len = Math.min(binarySection.sizeInBytes, other.binarySection.sizeInBytes);
 
-		MemorySegment seg1 = segments[0];
-		MemorySegment seg2 = other.segments[0];
+		MemorySegment seg1 = binarySection.segments[0];
+		MemorySegment seg2 = other.binarySection.segments[0];
 
-		int segmentSize = segments[0].size();
-		int otherSegmentSize = other.segments[0].size();
+		int segmentSize = binarySection.segments[0].size();
+		int otherSegmentSize = other.binarySection.segments[0].size();
 
-		int sizeOfFirst1 = segmentSize - offset;
-		int sizeOfFirst2 = otherSegmentSize - other.offset;
+		int sizeOfFirst1 = segmentSize - binarySection.offset;
+		int sizeOfFirst2 = otherSegmentSize - other.binarySection.offset;
 
 		int varSegIndex1 = 1;
 		int varSegIndex2 = 1;
@@ -272,12 +308,12 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 		// find the first segment of this string.
 		while (sizeOfFirst1 <= 0) {
 			sizeOfFirst1 += segmentSize;
-			seg1 = segments[varSegIndex1++];
+			seg1 = binarySection.segments[varSegIndex1++];
 		}
 
 		while (sizeOfFirst2 <= 0) {
 			sizeOfFirst2 += otherSegmentSize;
-			seg2 = other.segments[varSegIndex2++];
+			seg2 = other.binarySection.segments[varSegIndex2++];
 		}
 
 		int offset1 = segmentSize - sizeOfFirst1;
@@ -299,20 +335,20 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 			len -= needCompare;
 			// next segment
 			if (sizeOfFirst1 < sizeOfFirst2) { //I am smaller
-				seg1 = segments[varSegIndex1++];
+				seg1 = binarySection.segments[varSegIndex1++];
 				offset1 = 0;
 				offset2 += needCompare;
 				sizeOfFirst1 = segmentSize;
 				sizeOfFirst2 -= needCompare;
 			} else if (sizeOfFirst1 > sizeOfFirst2) { //other is smaller
-				seg2 = other.segments[varSegIndex2++];
+				seg2 = other.binarySection.segments[varSegIndex2++];
 				offset2 = 0;
 				offset1 += needCompare;
 				sizeOfFirst2 = otherSegmentSize;
 				sizeOfFirst1 -= needCompare;
 			} else { // same, should go ahead both.
-				seg1 = segments[varSegIndex1++];
-				seg2 = other.segments[varSegIndex2++];
+				seg1 = binarySection.segments[varSegIndex1++];
+				seg2 = other.binarySection.segments[varSegIndex2++];
 				offset1 = 0;
 				offset2 = 0;
 				sizeOfFirst1 = segmentSize;
@@ -323,7 +359,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 
 		checkArgument(needCompare == len);
 
-		return sizeInBytes - other.sizeInBytes;
+		return binarySection.sizeInBytes - other.binarySection.sizeInBytes;
 	}
 
 	/**
@@ -343,27 +379,27 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 	 */
 	public BinaryString substring(int beginIndex, int endIndex) {
 		ensureMaterialized();
-		if (endIndex <= beginIndex || beginIndex >= sizeInBytes) {
+		if (endIndex <= beginIndex || beginIndex >= binarySection.sizeInBytes) {
 			return EMPTY_UTF8;
 		}
 		if (inFirstSegment()) {
-			MemorySegment segment = segments[0];
+			MemorySegment segment = binarySection.segments[0];
 			int i = 0;
 			int c = 0;
-			while (i < sizeInBytes && c < beginIndex) {
-				i += numBytesForFirstByte(segment.get(i + offset));
+			while (i < binarySection.sizeInBytes && c < beginIndex) {
+				i += numBytesForFirstByte(segment.get(i + binarySection.offset));
 				c += 1;
 			}
 
 			int j = i;
-			while (i < sizeInBytes && c < endIndex) {
-				i += numBytesForFirstByte(segment.get(i + offset));
+			while (i < binarySection.sizeInBytes && c < endIndex) {
+				i += numBytesForFirstByte(segment.get(i + binarySection.offset));
 				c += 1;
 			}
 
 			if (i > j) {
 				byte[] bytes = new byte[i - j];
-				segment.get(offset + j, bytes, 0, i - j);
+				segment.get(binarySection.offset + j, bytes, 0, i - j);
 				return fromBytes(bytes);
 			} else {
 				return EMPTY_UTF8;
@@ -374,11 +410,11 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 	}
 
 	private BinaryString substringMultiSegs(final int start, final int until) {
-		int segSize = segments[0].size();
+		int segSize = binarySection.segments[0].size();
 		SegmentAndOffset index = firstSegmentAndOffset(segSize);
 		int i = 0;
 		int c = 0;
-		while (i < sizeInBytes && c < start) {
+		while (i < binarySection.sizeInBytes && c < start) {
 			int charSize = numBytesForFirstByte(index.value());
 			i += charSize;
 			index.skipBytes(charSize, segSize);
@@ -386,7 +422,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 		}
 
 		int j = i;
-		while (i < sizeInBytes && c < until) {
+		while (i < binarySection.sizeInBytes && c < until) {
 			int charSize = numBytesForFirstByte(index.value());
 			i += charSize;
 			index.skipBytes(charSize, segSize);
@@ -394,7 +430,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 		}
 
 		if (i > j) {
-			return fromBytes(SegmentsUtil.copyToBytes(segments, offset + j, i - j));
+			return fromBytes(SegmentsUtil.copyToBytes(binarySection.segments, binarySection.offset + j, i - j));
 		} else {
 			return EMPTY_UTF8;
 		}
@@ -410,12 +446,12 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 	public boolean contains(final BinaryString s) {
 		ensureMaterialized();
 		s.ensureMaterialized();
-		if (s.sizeInBytes == 0) {
+		if (s.binarySection.sizeInBytes == 0) {
 			return true;
 		}
 		int find = SegmentsUtil.find(
-			segments, offset, sizeInBytes,
-			s.segments, s.offset, s.sizeInBytes);
+			binarySection.segments, binarySection.offset, binarySection.sizeInBytes,
+			s.binarySection.segments, s.binarySection.offset, s.binarySection.sizeInBytes);
 		return find != -1;
 	}
 
@@ -446,7 +482,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 	public boolean endsWith(final BinaryString suffix) {
 		ensureMaterialized();
 		suffix.ensureMaterialized();
-		return matchAt(suffix, sizeInBytes - suffix.sizeInBytes);
+		return matchAt(suffix, binarySection.sizeInBytes - suffix.binarySection.sizeInBytes);
 	}
 
 	/**
@@ -461,9 +497,9 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 		ensureMaterialized();
 		if (inFirstSegment()) {
 			int s = 0;
-			int e = this.sizeInBytes - 1;
+			int e = this.binarySection.sizeInBytes - 1;
 			// skip all of the space (0x20) in the left side
-			while (s < this.sizeInBytes && getByteOneSegment(s) == 0x20) {
+			while (s < this.binarySection.sizeInBytes && getByteOneSegment(s) == 0x20) {
 				s++;
 			}
 			// skip all of the space (0x20) in the right side
@@ -483,11 +519,11 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 
 	private BinaryString trimMultiSegs() {
 		int s = 0;
-		int e = this.sizeInBytes - 1;
-		int segSize = segments[0].size();
+		int e = this.binarySection.sizeInBytes - 1;
+		int segSize = binarySection.segments[0].size();
 		SegmentAndOffset front = firstSegmentAndOffset(segSize);
 		// skip all of the space (0x20) in the left side
-		while (s < this.sizeInBytes && front.value() == 0x20) {
+		while (s < this.binarySection.sizeInBytes && front.value() == 0x20) {
 			s++;
 			front.nextByte(segSize);
 		}
@@ -518,7 +554,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 	public int indexOf(BinaryString str, int fromIndex) {
 		ensureMaterialized();
 		str.ensureMaterialized();
-		if (str.sizeInBytes == 0) {
+		if (str.binarySection.sizeInBytes == 0) {
 			return 0;
 		}
 		if (inFirstSegment()) {
@@ -526,21 +562,21 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 			int byteIdx = 0;
 			// position is char
 			int charIdx = 0;
-			while (byteIdx < sizeInBytes && charIdx < fromIndex) {
+			while (byteIdx < binarySection.sizeInBytes && charIdx < fromIndex) {
 				byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx));
 				charIdx++;
 			}
 			do {
-				if (byteIdx + str.sizeInBytes > sizeInBytes) {
+				if (byteIdx + str.binarySection.sizeInBytes > binarySection.sizeInBytes) {
 					return -1;
 				}
-				if (SegmentsUtil.equals(segments, offset + byteIdx,
-						str.segments, str.offset, str.sizeInBytes)) {
+				if (SegmentsUtil.equals(binarySection.segments, binarySection.offset + byteIdx,
+						str.binarySection.segments, str.binarySection.offset, str.binarySection.sizeInBytes)) {
 					return charIdx;
 				}
 				byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx));
 				charIdx++;
-			} while (byteIdx < sizeInBytes);
+			} while (byteIdx < binarySection.sizeInBytes);
 
 			return -1;
 		} else {
@@ -553,27 +589,27 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 		int byteIdx = 0;
 		// position is char
 		int charIdx = 0;
-		int segSize = segments[0].size();
+		int segSize = binarySection.segments[0].size();
 		SegmentAndOffset index = firstSegmentAndOffset(segSize);
-		while (byteIdx < sizeInBytes && charIdx < fromIndex) {
+		while (byteIdx < binarySection.sizeInBytes && charIdx < fromIndex) {
 			int charBytes = numBytesForFirstByte(index.value());
 			byteIdx += charBytes;
 			charIdx++;
 			index.skipBytes(charBytes, segSize);
 		}
 		do {
-			if (byteIdx + str.sizeInBytes > sizeInBytes) {
+			if (byteIdx + str.binarySection.sizeInBytes > binarySection.sizeInBytes) {
 				return -1;
 			}
-			if (SegmentsUtil.equals(segments, offset + byteIdx,
-					str.segments, str.offset, str.sizeInBytes)) {
+			if (SegmentsUtil.equals(binarySection.segments, binarySection.offset + byteIdx,
+					str.binarySection.segments, str.binarySection.offset, str.binarySection.sizeInBytes)) {
 				return charIdx;
 			}
 			int charBytes = numBytesForFirstByte(index.segment.get(index.offset));
 			byteIdx += charBytes;
 			charIdx++;
 			index.skipBytes(charBytes, segSize);
-		} while (byteIdx < sizeInBytes);
+		} while (byteIdx < binarySection.sizeInBytes);
 
 		return -1;
 	}
@@ -587,14 +623,14 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 		if (javaObject != null) {
 			return javaToUpperCase();
 		}
-		if (sizeInBytes == 0) {
+		if (binarySection.sizeInBytes == 0) {
 			return EMPTY_UTF8;
 		}
-		int size = segments[0].size();
+		int size = binarySection.segments[0].size();
 		SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size);
-		byte[] bytes = new byte[sizeInBytes];
+		byte[] bytes = new byte[binarySection.sizeInBytes];
 		bytes[0] = (byte) Character.toTitleCase(segmentAndOffset.value());
-		for (int i = 0; i < sizeInBytes; i++) {
+		for (int i = 0; i < binarySection.sizeInBytes; i++) {
 			byte b = segmentAndOffset.value();
 			if (numBytesForFirstByte(b) != 1) {
 				// fallback
@@ -624,14 +660,14 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 		if (javaObject != null) {
 			return javaToLowerCase();
 		}
-		if (sizeInBytes == 0) {
+		if (binarySection.sizeInBytes == 0) {
 			return EMPTY_UTF8;
 		}
-		int size = segments[0].size();
+		int size = binarySection.segments[0].size();
 		SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size);
-		byte[] bytes = new byte[sizeInBytes];
+		byte[] bytes = new byte[binarySection.sizeInBytes];
 		bytes[0] = (byte) Character.toTitleCase(segmentAndOffset.value());
-		for (int i = 0; i < sizeInBytes; i++) {
+		for (int i = 0; i < binarySection.sizeInBytes; i++) {
 			byte b = segmentAndOffset.value();
 			if (numBytesForFirstByte(b) != 1) {
 				// fallback
@@ -657,11 +693,11 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 	// ------------------------------------------------------------------------------------------
 
 	byte getByteOneSegment(int i) {
-		return segments[0].get(offset + i);
+		return binarySection.segments[0].get(binarySection.offset + i);
 	}
 
 	boolean inFirstSegment() {
-		return sizeInBytes + offset <= segments[0].size();
+		return binarySection.sizeInBytes + binarySection.offset <= binarySection.segments[0].size();
 	}
 
 	private boolean matchAt(final BinaryString s, int pos) {
@@ -669,41 +705,50 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 	}
 
 	private boolean matchAtOneSeg(final BinaryString s, int pos) {
-		return s.sizeInBytes + pos <= sizeInBytes && pos >= 0 &&
-			segments[0].equalTo(s.segments[0], offset + pos, s.offset, s.sizeInBytes);
+		return s.binarySection.sizeInBytes + pos <= binarySection.sizeInBytes && pos >= 0 &&
+			binarySection.segments[0].equalTo(
+				s.binarySection.segments[0],
+				binarySection.offset + pos,
+				s.binarySection.offset,
+				s.binarySection.sizeInBytes);
 	}
 
 	private boolean matchAtVarSeg(final BinaryString s, int pos) {
-		return s.sizeInBytes + pos <= sizeInBytes && pos >= 0 &&
-			SegmentsUtil.equals(segments, offset + pos, s.segments, s.offset, s.sizeInBytes);
+		return s.binarySection.sizeInBytes + pos <= binarySection.sizeInBytes && pos >= 0 &&
+			SegmentsUtil.equals(
+				binarySection.segments,
+				binarySection.offset + pos,
+				s.binarySection.segments,
+				s.binarySection.offset,
+				s.binarySection.sizeInBytes);
 	}
 
 	BinaryString copyBinaryStringInOneSeg(int start, int len) {
 		byte[] newBytes = new byte[len];
-		segments[0].get(offset + start, newBytes, 0, len);
+		binarySection.segments[0].get(binarySection.offset + start, newBytes, 0, len);
 		return fromBytes(newBytes);
 	}
 
 	BinaryString copyBinaryString(int start, int end) {
 		int len = end - start + 1;
 		byte[] newBytes = new byte[len];
-		SegmentsUtil.copyToBytes(segments, offset + start, newBytes, 0, len);
+		SegmentsUtil.copyToBytes(binarySection.segments, binarySection.offset + start, newBytes, 0, len);
 		return fromBytes(newBytes);
 	}
 
 	SegmentAndOffset firstSegmentAndOffset(int segSize) {
-		int segIndex = offset / segSize;
-		return new SegmentAndOffset(segIndex, offset % segSize);
+		int segIndex = binarySection.offset / segSize;
+		return new SegmentAndOffset(segIndex, binarySection.offset % segSize);
 	}
 
 	SegmentAndOffset lastSegmentAndOffset(int segSize) {
-		int lastOffset = offset + sizeInBytes - 1;
+		int lastOffset = binarySection.offset + binarySection.sizeInBytes - 1;
 		int segIndex = lastOffset / segSize;
 		return new SegmentAndOffset(segIndex, lastOffset % segSize);
 	}
 
 	private SegmentAndOffset startSegmentAndOffset(int segSize) {
-		return inFirstSegment() ? new SegmentAndOffset(0, offset) : firstSegmentAndOffset(segSize);
+		return inFirstSegment() ? new SegmentAndOffset(0, binarySection.offset) : firstSegmentAndOffset(segSize);
 	}
 
 	/**
@@ -716,13 +761,13 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
 
 		private SegmentAndOffset(int segIndex, int offset) {
 			this.segIndex = segIndex;
-			this.segment = segments[segIndex];
+			this.segment = binarySection.segments[segIndex];
 			this.offset = offset;
 		}
 
 		private void assignSegment() {
-			segment = segIndex >= 0 && segIndex < segments.length ?
-					segments[segIndex] : null;
+			segment = segIndex >= 0 && segIndex < binarySection.segments.length ?
+					binarySection.segments[segIndex] : null;
 		}
 
 		void previousByte(int segSize) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java
index 6faa51a..021a8af 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java
@@ -582,9 +582,9 @@ public class BinaryStringUtil {
 			int byteIdx = 0;
 			// position of last split1
 			int lastSplit1Idx = -1;
-			while (byteIdx < str.sizeInBytes) {
+			while (byteIdx < str.getSizeInBytes()) {
 				// If find next split1 in str, process current kv
-				if (str.segments[0].get(str.offset + byteIdx) == split1) {
+				if (str.getSegments()[0].get(str.getOffset() + byteIdx) == split1) {
 					int currentKeyIdx = lastSplit1Idx + 1;
 					// If key of current kv is keyName, return the value directly
 					BinaryString value = findValueOfKey(str, split2, keyName, currentKeyIdx, byteIdx);
@@ -597,7 +597,7 @@ public class BinaryStringUtil {
 			}
 			// process the string which is not ends with split1
 			int currentKeyIdx = lastSplit1Idx + 1;
-			return findValueOfKey(str, split2, keyName, currentKeyIdx, str.sizeInBytes);
+			return findValueOfKey(str, split2, keyName, currentKeyIdx, str.getSizeInBytes());
 		} else {
 			return keyValueSlow(str, split1, split2, keyName);
 		}
@@ -609,16 +609,16 @@ public class BinaryStringUtil {
 			BinaryString keyName,
 			int start,
 			int end) {
-		int keyNameLen = keyName.sizeInBytes;
+		int keyNameLen = keyName.getSizeInBytes();
 		for (int idx = start; idx < end; idx++) {
-			if (str.segments[0].get(str.offset + idx) == split) {
+			if (str.getSegments()[0].get(str.getOffset() + idx) == split) {
 				if (idx == start + keyNameLen &&
-						str.segments[0].equalTo(keyName.segments[0], str.offset + start,
-								keyName.offset, keyNameLen)) {
+						str.getSegments()[0].equalTo(keyName.getSegments()[0], str.getOffset() + start,
+								keyName.getOffset(), keyNameLen)) {
 					int valueIdx = idx + 1;
 					int valueLen = end - valueIdx;
 					byte[] bytes = new byte[valueLen];
-					str.segments[0].get(str.offset + valueIdx, bytes, 0, valueLen);
+					str.getSegments()[0].get(str.getOffset() + valueIdx, bytes, 0, valueLen);
 					return fromBytes(bytes, 0, valueLen);
 				} else {
 					return null;
@@ -637,7 +637,7 @@ public class BinaryStringUtil {
 		int byteIdx = 0;
 		// position of last split1
 		int lastSplit1Idx = -1;
-		while (byteIdx < str.sizeInBytes) {
+		while (byteIdx < str.getSizeInBytes()) {
 			// If find next split1 in str, process current kv
 			if (str.byteAt(byteIdx) == split1) {
 				int currentKeyIdx = lastSplit1Idx + 1;
@@ -650,7 +650,7 @@ public class BinaryStringUtil {
 			byteIdx++;
 		}
 		int currentKeyIdx = lastSplit1Idx + 1;
-		return findValueOfKeySlow(str, split2, keyName, currentKeyIdx, str.sizeInBytes);
+		return findValueOfKeySlow(str, split2, keyName, currentKeyIdx, str.getSizeInBytes());
 	}
 
 	private static BinaryString findValueOfKeySlow(
@@ -659,14 +659,14 @@ public class BinaryStringUtil {
 			BinaryString keyName,
 			int start,
 			int end) {
-		int keyNameLen = keyName.sizeInBytes;
+		int keyNameLen = keyName.getSizeInBytes();
 		for (int idx = start; idx < end; idx++) {
 			if (str.byteAt(idx) == split) {
 				if (idx == start + keyNameLen &&
-						SegmentsUtil.equals(str.segments, str.offset + start, keyName.segments,
-								keyName.offset, keyNameLen)) {
+						SegmentsUtil.equals(str.getSegments(), str.getOffset() + start, keyName.getSegments(),
+								keyName.getOffset(), keyNameLen)) {
 					int valueIdx = idx + 1;
-					byte[] bytes = SegmentsUtil.copyToBytes(str.segments, str.offset + valueIdx, end - valueIdx);
+					byte[] bytes = SegmentsUtil.copyToBytes(str.getSegments(), str.getOffset() + valueIdx, end - valueIdx);
 					return fromBytes(bytes);
 				} else {
 					return null;
@@ -740,8 +740,8 @@ public class BinaryStringUtil {
 		int offset = 0;
 		for (BinaryString input : inputs) {
 			if (input != null) {
-				int len = input.sizeInBytes;
-				SegmentsUtil.copyToBytes(input.segments, input.offset, result, offset, len);
+				int len = input.getSizeInBytes();
+				SegmentsUtil.copyToBytes(input.getSegments(), input.getOffset(), result, offset, len);
 				offset += len;
 			}
 		}
@@ -771,7 +771,7 @@ public class BinaryStringUtil {
 		for (BinaryString input : inputs) {
 			if (input != null) {
 				input.ensureMaterialized();
-				numInputBytes += input.sizeInBytes;
+				numInputBytes += input.getSizeInBytes();
 				numInputs++;
 			}
 		}
@@ -783,21 +783,26 @@ public class BinaryStringUtil {
 
 		// Allocate a new byte array, and copy the inputs one by one into it.
 		// The size of the new array is the size of all inputs, plus the separators.
-		final byte[] result = new byte[numInputBytes + (numInputs - 1) * separator.sizeInBytes];
+		final byte[] result = new byte[numInputBytes + (numInputs - 1) * separator.getSizeInBytes()];
 		int offset = 0;
 
 		int j = 0;
 		for (BinaryString input : inputs) {
 			if (input != null) {
-				int len = input.sizeInBytes;
-				SegmentsUtil.copyToBytes(input.segments, input.offset, result, offset, len);
+				int len = input.getSizeInBytes();
+				SegmentsUtil.copyToBytes(input.getSegments(), input.getOffset(), result, offset, len);
 				offset += len;
 
 				j++;
 				// Add separator if this is not the last input.
 				if (j < numInputs) {
-					SegmentsUtil.copyToBytes(separator.segments, separator.offset, result, offset, separator.sizeInBytes);
-					offset += separator.sizeInBytes;
+					SegmentsUtil.copyToBytes(
+						separator.getSegments(),
+						separator.getOffset(),
+						result,
+						offset,
+						separator.getSizeInBytes());
+					offset += separator.getSizeInBytes();
 				}
 			}
 		}
@@ -812,13 +817,13 @@ public class BinaryStringUtil {
 	public static BinaryString reverse(BinaryString str) {
 		str.ensureMaterialized();
 		if (str.inFirstSegment()) {
-			byte[] result = new byte[str.sizeInBytes];
+			byte[] result = new byte[str.getSizeInBytes()];
 			// position in byte
 			int byteIdx = 0;
-			while (byteIdx < str.sizeInBytes) {
+			while (byteIdx < str.getSizeInBytes()) {
 				int charBytes = numBytesForFirstByte(str.getByteOneSegment(byteIdx));
-				str.segments[0].get(
-						str.offset + byteIdx,
+				str.getSegments()[0].get(
+						str.getOffset() + byteIdx,
 						result,
 						result.length - byteIdx - charBytes,
 						charBytes);
@@ -831,16 +836,16 @@ public class BinaryStringUtil {
 	}
 
 	private static BinaryString reverseMultiSegs(BinaryString str) {
-		byte[] result = new byte[str.sizeInBytes];
+		byte[] result = new byte[str.getSizeInBytes()];
 		// position in byte
 		int byteIdx = 0;
-		int segSize = str.segments[0].size();
+		int segSize = str.getSegments()[0].size();
 		BinaryString.SegmentAndOffset index = str.firstSegmentAndOffset(segSize);
-		while (byteIdx <  str.sizeInBytes) {
+		while (byteIdx <  str.getSizeInBytes()) {
 			int charBytes = numBytesForFirstByte(index.value());
 			SegmentsUtil.copyMultiSegmentsToBytes(
-					str.segments,
-					str.offset + byteIdx,
+					str.getSegments(),
+					str.getOffset() + byteIdx,
 					result,
 					result.length - byteIdx - charBytes,
 					charBytes);
@@ -870,14 +875,14 @@ public class BinaryStringUtil {
 		if (str.inFirstSegment()) {
 			int s = 0;
 			// skip all of the space (0x20) in the left side
-			while (s < str.sizeInBytes && str.getByteOneSegment(s) == 0x20) {
+			while (s < str.getSizeInBytes() && str.getByteOneSegment(s) == 0x20) {
 				s++;
 			}
-			if (s == str.sizeInBytes) {
+			if (s == str.getSizeInBytes()) {
 				// empty string
 				return EMPTY_UTF8;
 			} else {
-				return str.copyBinaryStringInOneSeg(s, str.sizeInBytes - s);
+				return str.copyBinaryStringInOneSeg(s, str.getSizeInBytes() - s);
 			}
 		} else {
 			return trimLeftSlow(str);
@@ -886,18 +891,18 @@ public class BinaryStringUtil {
 
 	private static BinaryString trimLeftSlow(BinaryString str) {
 		int s = 0;
-		int segSize = str.segments[0].size();
+		int segSize = str.getSegments()[0].size();
 		BinaryString.SegmentAndOffset front = str.firstSegmentAndOffset(segSize);
 		// skip all of the space (0x20) in the left side
-		while (s < str.sizeInBytes && front.value() == 0x20) {
+		while (s < str.getSizeInBytes() && front.value() == 0x20) {
 			s++;
 			front.nextByte(segSize);
 		}
-		if (s == str.sizeInBytes) {
+		if (s == str.getSizeInBytes()) {
 			// empty string
 			return EMPTY_UTF8;
 		} else {
-			return str.copyBinaryString(s, str.sizeInBytes - 1);
+			return str.copyBinaryString(s, str.getSizeInBytes() - 1);
 		}
 	}
 
@@ -929,7 +934,7 @@ public class BinaryStringUtil {
 		}
 		if (str.inFirstSegment()) {
 			int searchIdx = 0;
-			while (searchIdx < str.sizeInBytes) {
+			while (searchIdx < str.getSizeInBytes()) {
 				int charBytes = numBytesForFirstByte(str.getByteOneSegment(searchIdx));
 				BinaryString currentChar = str.copyBinaryStringInOneSeg(searchIdx, charBytes);
 				// try to find the matching for the character in the trimString characters.
@@ -940,10 +945,10 @@ public class BinaryStringUtil {
 				}
 			}
 			// empty string
-			if (searchIdx >= str.sizeInBytes) {
+			if (searchIdx >= str.getSizeInBytes()) {
 				return EMPTY_UTF8;
 			} else {
-				return str.copyBinaryStringInOneSeg(searchIdx, str.sizeInBytes - searchIdx);
+				return str.copyBinaryStringInOneSeg(searchIdx, str.getSizeInBytes() - searchIdx);
 			}
 		} else {
 			return trimLeftSlow(str, trimStr);
@@ -952,9 +957,9 @@ public class BinaryStringUtil {
 
 	private static BinaryString trimLeftSlow(BinaryString str, BinaryString trimStr) {
 		int searchIdx = 0;
-		int segSize = str.segments[0].size();
+		int segSize = str.getSegments()[0].size();
 		BinaryString.SegmentAndOffset front = str.firstSegmentAndOffset(segSize);
-		while (searchIdx < str.sizeInBytes) {
+		while (searchIdx < str.getSizeInBytes()) {
 			int charBytes = numBytesForFirstByte(front.value());
 			BinaryString currentChar = str.copyBinaryString(searchIdx, searchIdx + charBytes - 1);
 			if (trimStr.contains(currentChar)) {
@@ -964,18 +969,18 @@ public class BinaryStringUtil {
 				break;
 			}
 		}
-		if (searchIdx == str.sizeInBytes) {
+		if (searchIdx == str.getSizeInBytes()) {
 			// empty string
 			return EMPTY_UTF8;
 		} else {
-			return str.copyBinaryString(searchIdx, str.sizeInBytes - 1);
+			return str.copyBinaryString(searchIdx, str.getSizeInBytes() - 1);
 		}
 	}
 
 	public static BinaryString trimRight(BinaryString str) {
 		str.ensureMaterialized();
 		if (str.inFirstSegment()) {
-			int e = str.sizeInBytes - 1;
+			int e = str.getSizeInBytes() - 1;
 			// skip all of the space (0x20) in the right side
 			while (e >= 0 && str.getByteOneSegment(e) == 0x20) {
 				e--;
@@ -993,8 +998,8 @@ public class BinaryStringUtil {
 	}
 
 	private static BinaryString trimRightSlow(BinaryString str) {
-		int e = str.sizeInBytes - 1;
-		int segSize = str.segments[0].size();
+		int e = str.getSizeInBytes() - 1;
+		int segSize = str.getSegments()[0].size();
 		BinaryString.SegmentAndOffset behind = str.lastSegmentAndOffset(segSize);
 		// skip all of the space (0x20) in the right side
 		while (e >= 0 && behind.value() == 0x20) {
@@ -1032,10 +1037,10 @@ public class BinaryStringUtil {
 			int charIdx = 0;
 			int byteIdx = 0;
 			// each element in charLens is length of character in the source string
-			int[] charLens = new int[str.sizeInBytes];
+			int[] charLens = new int[str.getSizeInBytes()];
 			// each element in charStartPos is start position of first byte in the source string
-			int[] charStartPos = new int[str.sizeInBytes];
-			while (byteIdx < str.sizeInBytes) {
+			int[] charStartPos = new int[str.getSizeInBytes()];
+			while (byteIdx < str.getSizeInBytes()) {
 				charStartPos[charIdx] = byteIdx;
 				charLens[charIdx] = numBytesForFirstByte(str.getByteOneSegment(byteIdx));
 				byteIdx += charLens[charIdx];
@@ -1043,7 +1048,7 @@ public class BinaryStringUtil {
 			}
 			// searchIdx points to the first character which is not in trim string from the right
 			// end.
-			int searchIdx = str.sizeInBytes - 1;
+			int searchIdx = str.getSizeInBytes() - 1;
 			charIdx -= 1;
 			while (charIdx >= 0) {
 				BinaryString currentChar = str.copyBinaryStringInOneSeg(
@@ -1069,13 +1074,13 @@ public class BinaryStringUtil {
 	private static BinaryString trimRightSlow(BinaryString str, BinaryString trimStr) {
 		int charIdx = 0;
 		int byteIdx = 0;
-		int segSize = str.segments[0].size();
+		int segSize = str.getSegments()[0].size();
 		BinaryString.SegmentAndOffset index = str.firstSegmentAndOffset(segSize);
 		// each element in charLens is length of character in the source string
-		int[] charLens = new int[str.sizeInBytes];
+		int[] charLens = new int[str.getSizeInBytes()];
 		// each element in charStartPos is start position of first byte in the source string
-		int[] charStartPos = new int[str.sizeInBytes];
-		while (byteIdx < str.sizeInBytes) {
+		int[] charStartPos = new int[str.getSizeInBytes()];
+		while (byteIdx < str.getSizeInBytes()) {
 			charStartPos[charIdx] = byteIdx;
 			int charBytes = numBytesForFirstByte(index.value());
 			charLens[charIdx] = charBytes;
@@ -1085,7 +1090,7 @@ public class BinaryStringUtil {
 		}
 		// searchIdx points to the first character which is not in trim string from the right
 		// end.
-		int searchIdx = str.sizeInBytes - 1;
+		int searchIdx = str.getSizeInBytes() - 1;
 		charIdx -= 1;
 		while (charIdx >= 0) {
 			BinaryString currentChar = str.copyBinaryString(
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
index 8163f57..3a559b7 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
 import org.apache.flink.table.runtime.typeutils.BaseMapSerializer;
 import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
 
@@ -68,7 +69,7 @@ public interface BinaryWriter {
 
 	void writeRow(int pos, BaseRow value, BaseRowSerializer type);
 
-	void writeGeneric(int pos, BinaryGeneric value);
+	void writeGeneric(int pos, BinaryGeneric value, BinaryGenericSerializer serializer);
 
 	/**
 	 * Finally, complete write to set real size to binary.
@@ -124,7 +125,7 @@ public interface BinaryWriter {
 				writer.writeRow(pos, (BaseRow) o, (BaseRowSerializer) serializer);
 				break;
 			case ANY:
-				writer.writeGeneric(pos, (BinaryGeneric) o);
+				writer.writeGeneric(pos, (BinaryGeneric) o, (BinaryGenericSerializer) serializer);
 				break;
 			case BINARY:
 			case VARBINARY:
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index b6e6bb2..1185457 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -620,7 +620,7 @@ public class DataFormatConverters {
 
 		@Override
 		BinaryGeneric<T> toInternalImpl(T value) {
-			return new BinaryGeneric<>(value, serializer);
+			return new BinaryGeneric<>(value);
 		}
 
 		@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/LazyBinaryFormat.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/LazyBinaryFormat.java
index be29ee1..5f91820 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/LazyBinaryFormat.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/LazyBinaryFormat.java
@@ -17,7 +17,11 @@
 
 package org.apache.flink.table.dataformat;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import java.io.IOException;
 
 /**
  * Lazy binary format.
@@ -37,59 +41,77 @@ import org.apache.flink.core.memory.MemorySegment;
  * It can lazy the conversions as much as possible. Only when it is needed can it be converted
  * into the required form.
  */
-public abstract class LazyBinaryFormat<T> extends BinaryFormat {
+public abstract class LazyBinaryFormat<T> implements BinaryFormat {
 
-	protected T javaObject;
+	T javaObject;
+	BinarySection binarySection;
 
 	public LazyBinaryFormat() {
 		this(null, -1, -1, null);
 	}
 
 	public LazyBinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes, T javaObject) {
-		super(segments, offset, sizeInBytes);
-		this.javaObject = javaObject;
+		this(javaObject, new BinarySection(segments, offset, sizeInBytes));
 	}
 
 	public LazyBinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes) {
-		this(segments, offset, sizeInBytes, null);
+		this(null, new BinarySection(segments, offset, sizeInBytes));
 	}
 
 	public LazyBinaryFormat(T javaObject) {
-		this(null, -1, -1, javaObject);
+		this(javaObject, null);
+	}
+
+	public LazyBinaryFormat(T javaObject, BinarySection binarySection) {
+		this.javaObject = javaObject;
+		this.binarySection = binarySection;
 	}
 
 	public T getJavaObject() {
 		return javaObject;
 	}
 
+	/**
+	 * Must be public as it is used during code generation.
+	 */
 	public void setJavaObject(T javaObject) {
 		this.javaObject = javaObject;
 	}
 
 	@Override
 	public MemorySegment[] getSegments() {
-		ensureMaterialized();
-		return segments;
+		if (binarySection == null) {
+			throw new IllegalStateException("Lazy Binary Format was not materialized");
+		}
+		return binarySection.segments;
 	}
 
 	@Override
 	public int getOffset() {
-		ensureMaterialized();
-		return offset;
+		if (binarySection == null) {
+			throw new IllegalStateException("Lazy Binary Format was not materialized");
+		}
+		return binarySection.offset;
 	}
 
 	@Override
 	public int getSizeInBytes() {
-		ensureMaterialized();
-		return sizeInBytes;
+		if (binarySection == null) {
+			throw new IllegalStateException("Lazy Binary Format was not materialized");
+		}
+		return binarySection.sizeInBytes;
 	}
 
 	/**
 	 * Ensure we have materialized binary format.
 	 */
-	public void ensureMaterialized() {
-		if (segments == null) {
-			materialize();
+	public final void ensureMaterialized(TypeSerializer<T> serializer) {
+		if (binarySection == null) {
+			try {
+				this.binarySection = materialize(serializer);
+			} catch (IOException e) {
+				throw new WrappingRuntimeException(e);
+			}
 		}
 	}
 
@@ -98,24 +120,5 @@ public abstract class LazyBinaryFormat<T> extends BinaryFormat {
 	 * Inherited classes need to hold the information they need.
 	 * (For example, BinaryGeneric needs javaObjectSerializer).
 	 */
-	public abstract void materialize();
-
-	@Override
-	public boolean equals(Object o) {
-		if (o != null && o instanceof LazyBinaryFormat) {
-			LazyBinaryFormat other = (LazyBinaryFormat) o;
-
-			ensureMaterialized();
-			other.ensureMaterialized();
-			return binaryEquals(other);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		ensureMaterialized();
-		return super.hashCode();
-	}
+	protected abstract BinarySection materialize(TypeSerializer<T> serializer) throws IOException;
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
index 252eec9..f31a138 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
@@ -21,6 +21,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.table.runtime.util.SegmentsUtil;
 
+import static org.apache.flink.table.dataformat.BinaryFormat.readBinaryFieldFromSegments;
 import static org.apache.flink.table.dataformat.BinaryRow.calculateBitSetWidthInBytes;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -32,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * has a possibility to cross the boundary of a segment, while the fixed-length part of {@link BinaryRow}
  * must fit into its first memory segment.
  */
-public final class NestedRow extends BinaryFormat implements BaseRow {
+public final class NestedRow extends BinarySection implements BaseRow {
 
 	private final int arity;
 	private final int nullBitsSizeInBytes;
@@ -224,7 +225,7 @@ public final class NestedRow extends BinaryFormat implements BaseRow {
 		assertIndexIsValid(pos);
 		int fieldOffset = getFieldOffset(pos);
 		final long offsetAndLen = SegmentsUtil.getLong(segments, fieldOffset);
-		return BinaryString.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
+		return BinaryFormat.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
 	}
 
 	@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializer.java
index b0ca78f..29ba7eb 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializer.java
@@ -19,18 +19,15 @@
 package org.apache.flink.table.runtime.typeutils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.table.dataformat.BinaryGeneric;
 import org.apache.flink.table.runtime.util.SegmentsUtil;
-import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
 
@@ -55,12 +52,19 @@ public final class BinaryGenericSerializer<T> extends TypeSerializer<BinaryGener
 
 	@Override
 	public BinaryGeneric<T> createInstance() {
-		return new BinaryGeneric<>(serializer.createInstance(), serializer);
+		return new BinaryGeneric<>(serializer.createInstance());
 	}
 
 	@Override
 	public BinaryGeneric<T> copy(BinaryGeneric<T> from) {
-		return from.copy();
+		from.ensureMaterialized(serializer);
+		byte[] bytes = SegmentsUtil.copyToBytes(from.getSegments(), from.getOffset(), from.getSizeInBytes());
+		T newJavaObject = from.getJavaObject() == null ? null : serializer.copy(from.getJavaObject());
+		return new BinaryGeneric<>(
+			new MemorySegment[]{MemorySegmentFactory.wrap(bytes)},
+			0,
+			bytes.length,
+			newJavaObject);
 	}
 
 	@Override
@@ -75,6 +79,7 @@ public final class BinaryGenericSerializer<T> extends TypeSerializer<BinaryGener
 
 	@Override
 	public void serialize(BinaryGeneric<T> record, DataOutputView target) throws IOException {
+		record.ensureMaterialized(serializer);
 		target.writeInt(record.getSizeInBytes());
 		SegmentsUtil.copyToView(record.getSegments(), record.getOffset(), record.getSizeInBytes(), target);
 	}
@@ -86,7 +91,8 @@ public final class BinaryGenericSerializer<T> extends TypeSerializer<BinaryGener
 		source.readFully(bytes);
 		return new BinaryGeneric<>(
 				new MemorySegment[] {MemorySegmentFactory.wrap(bytes)},
-				0, bytes.length, serializer);
+				0,
+				bytes.length);
 	}
 
 	@Override
@@ -103,7 +109,7 @@ public final class BinaryGenericSerializer<T> extends TypeSerializer<BinaryGener
 
 	@Override
 	public BinaryGenericSerializer<T> duplicate() {
-		return this;
+		return new BinaryGenericSerializer<>(serializer.duplicate());
 	}
 
 	@Override
@@ -127,63 +133,41 @@ public final class BinaryGenericSerializer<T> extends TypeSerializer<BinaryGener
 
 	@Override
 	public TypeSerializerSnapshot<BinaryGeneric<T>> snapshotConfiguration() {
-		return new BinaryGenericSerializerSnapshot<>(serializer);
+		return new BinaryGenericSerializerSnapshot<>(this);
+	}
+
+	public TypeSerializer<T> getInnerSerializer() {
+		return serializer;
 	}
 
 	/**
 	 * {@link TypeSerializerSnapshot} for {@link BinaryGenericSerializer}.
 	 */
-	public static final class BinaryGenericSerializerSnapshot<T> implements TypeSerializerSnapshot<BinaryGeneric<T>> {
-		private static final int CURRENT_VERSION = 3;
-
-		private TypeSerializer<T> previousSerializer;
+	public static final class BinaryGenericSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<BinaryGeneric<T>, BinaryGenericSerializer<T>> {
 
 		@SuppressWarnings("unused")
 		public BinaryGenericSerializerSnapshot() {
-			// this constructor is used when restoring from a checkpoint/savepoint.
-		}
-
-		BinaryGenericSerializerSnapshot(TypeSerializer<T> serializer) {
-			this.previousSerializer = serializer;
+			super(BinaryGenericSerializer.class);
 		}
 
-		@Override
-		public int getCurrentVersion() {
-			return CURRENT_VERSION;
-		}
-
-		@Override
-		public void writeSnapshot(DataOutputView out) throws IOException {
-			InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousSerializer);
+		public BinaryGenericSerializerSnapshot(BinaryGenericSerializer<T> serializerInstance) {
+			super(serializerInstance);
 		}
 
 		@Override
-		public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-			try {
-				this.previousSerializer = InstantiationUtil.deserializeObject(
-						new DataInputViewStream(in), userCodeClassLoader);
-			} catch (ClassNotFoundException e) {
-				throw new IOException(e);
-			}
+		protected int getCurrentOuterSnapshotVersion() {
+			return 0;
 		}
 
 		@Override
-		public TypeSerializer<BinaryGeneric<T>> restoreSerializer() {
-			return new BinaryGenericSerializer<>(previousSerializer);
+		protected TypeSerializer<?>[] getNestedSerializers(BinaryGenericSerializer<T> outerSerializer) {
+			return new TypeSerializer[]{outerSerializer.serializer};
 		}
 
 		@Override
-		public TypeSerializerSchemaCompatibility<BinaryGeneric<T>> resolveSchemaCompatibility(TypeSerializer<BinaryGeneric<T>> newSerializer) {
-			if (!(newSerializer instanceof BinaryGenericSerializer)) {
-				return TypeSerializerSchemaCompatibility.incompatible();
-			}
-
-			BinaryGenericSerializer newBinaryGenericSerializer = (BinaryGenericSerializer) newSerializer;
-			if (!previousSerializer.equals(newBinaryGenericSerializer.serializer)) {
-				return TypeSerializerSchemaCompatibility.incompatible();
-			} else {
-				return TypeSerializerSchemaCompatibility.compatibleAsIs();
-			}
+		@SuppressWarnings("unchecked")
+		protected BinaryGenericSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			return new BinaryGenericSerializer<>((TypeSerializer<T>) nestedSerializers[0]);
 		}
 	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
index a86e001..82019d0 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
 import org.apache.flink.table.runtime.typeutils.BaseMapSerializer;
 import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -30,9 +31,11 @@ import org.junit.Test;
 
 import java.math.BigDecimal;
 
+import static org.apache.flink.table.utils.BinaryGenericAsserter.equivalent;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -48,11 +51,13 @@ public class BaseRowTest {
 	private BinaryMap map;
 	private BinaryRow underRow;
 	private byte[] bytes;
+	private BinaryGenericSerializer<String> genericSerializer;
 
 	@Before
 	public void before() {
 		str = BinaryString.fromString("haha");
-		generic = new BinaryGeneric<>("haha", StringSerializer.INSTANCE);
+		generic = new BinaryGeneric<>("haha");
+		genericSerializer = new BinaryGenericSerializer<>(StringSerializer.INSTANCE);
 		decimal1 = Decimal.fromLong(10, 5, 0);
 		decimal2 = Decimal.fromBigDecimal(new BigDecimal(11), 20, 0);
 		array = new BinaryArray();
@@ -98,7 +103,7 @@ public class BaseRowTest {
 		writer.writeFloat(5, 5);
 		writer.writeDouble(6, 6);
 		writer.writeString(8, str);
-		writer.writeGeneric(9, generic);
+		writer.writeGeneric(9, generic, genericSerializer);
 		writer.writeDecimal(10, decimal1, 5);
 		writer.writeDecimal(11, decimal2, 20);
 		writer.writeArray(12, array, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
@@ -193,7 +198,7 @@ public class BaseRowTest {
 		assertEquals(5, (int) row.getFloat(5));
 		assertEquals(6, (int) row.getDouble(6));
 		assertEquals(str, row.getString(8));
-		assertEquals(generic, row.getGeneric(9));
+		assertThat(row.getGeneric(9), equivalent(generic, genericSerializer));
 		assertEquals(decimal1, row.getDecimal(10, 5, 0));
 		assertEquals(decimal2, row.getDecimal(11, 20, 0));
 		assertEquals(array, row.getArray(12));
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
index f64907e..9de5cdf 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
 import org.apache.flink.table.runtime.typeutils.BaseMapSerializer;
 import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
 import org.apache.flink.table.runtime.util.SegmentsUtil;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -36,7 +37,9 @@ import org.junit.Test;
 import java.math.BigDecimal;
 
 import static org.apache.flink.table.dataformat.BinaryString.fromString;
+import static org.apache.flink.table.utils.BinaryGenericAsserter.equivalent;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -460,13 +463,14 @@ public class BinaryArrayTest {
 	public void testGeneric() {
 		BinaryArray array = new BinaryArray();
 		BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
-		BinaryGeneric<String> generic = new BinaryGeneric<>("hahah", StringSerializer.INSTANCE);
-		writer.writeGeneric(0, generic);
+		BinaryGeneric<String> generic = new BinaryGeneric<>("hahah");
+		BinaryGenericSerializer<String> serializer = new BinaryGenericSerializer<>(StringSerializer.INSTANCE);
+		writer.writeGeneric(0, generic, serializer);
 		writer.setNullAt(1);
 		writer.complete();
 
 		BinaryGeneric newGeneric = array.getGeneric(0);
-		assertEquals(generic, newGeneric);
+		assertThat(newGeneric, equivalent(generic, serializer));
 		assertTrue(array.isNullAt(1));
 	}
 
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
index fa870b4..3a90aaa 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
 import org.apache.flink.table.runtime.typeutils.BaseMapSerializer;
 import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
 import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -68,11 +69,13 @@ import java.util.Set;
 import static org.apache.flink.table.dataformat.BinaryString.fromBytes;
 import static org.apache.flink.table.dataformat.BinaryString.fromString;
 import static org.apache.flink.table.dataformat.DataFormatTestUtil.MyObj;
+import static org.apache.flink.table.utils.BinaryGenericAsserter.equivalent;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -444,18 +447,18 @@ public class BinaryRowTest {
 	public void testGeneric() {
 		BinaryRow row = new BinaryRow(3);
 		BinaryRowWriter writer = new BinaryRowWriter(row);
-		BinaryGeneric<String> hahah = new BinaryGeneric<>("hahah", StringSerializer.INSTANCE);
-		writer.writeGeneric(0, hahah);
+		BinaryGenericSerializer<String> binarySerializer = new BinaryGenericSerializer<>(StringSerializer.INSTANCE);
+		BinaryGeneric<String> hahah = new BinaryGeneric<>("hahah");
+		writer.writeGeneric(0, hahah, binarySerializer);
 		writer.setNullAt(1);
-		hahah.ensureMaterialized();
-		writer.writeGeneric(2, hahah);
+		writer.writeGeneric(2, hahah, binarySerializer);
 		writer.complete();
 
 		BinaryGeneric<String> generic0 = row.getGeneric(0);
-		assertEquals(hahah, generic0);
+		assertThat(generic0, equivalent(hahah, binarySerializer));
 		assertTrue(row.isNullAt(1));
 		BinaryGeneric<String> generic2 = row.getGeneric(2);
-		assertEquals(hahah, generic2);
+		assertThat(generic2, equivalent(hahah, binarySerializer));
 	}
 
 	@Test
@@ -619,18 +622,18 @@ public class BinaryRowTest {
 
 		GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
 		TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+		BinaryGenericSerializer<MyObj> binarySerializer = new BinaryGenericSerializer<>(genericSerializer);
 
 		BinaryRow row = new BinaryRow(4);
 		BinaryRowWriter writer = new BinaryRowWriter(row);
 		writer.writeInt(0, 0);
 
-		BinaryGeneric<MyObj> myObj1 = new BinaryGeneric<>(new MyObj(0, 1), genericSerializer);
-		writer.writeGeneric(1, myObj1);
-		BinaryGeneric<MyObj> myObj2 = new BinaryGeneric<>(new MyObj(123, 5.0), genericSerializer);
-		myObj2.ensureMaterialized();
-		writer.writeGeneric(2, myObj2);
-		BinaryGeneric<MyObj> myObj3 = new BinaryGeneric<>(new MyObj(1, 1), genericSerializer);
-		writer.writeGeneric(3, myObj3);
+		BinaryGeneric<MyObj> myObj1 = new BinaryGeneric<>(new MyObj(0, 1));
+		writer.writeGeneric(1, myObj1, binarySerializer);
+		BinaryGeneric<MyObj> myObj2 = new BinaryGeneric<>(new MyObj(123, 5.0));
+		writer.writeGeneric(2, myObj2, binarySerializer);
+		BinaryGeneric<MyObj> myObj3 = new BinaryGeneric<>(new MyObj(1, 1));
+		writer.writeGeneric(3, myObj3, binarySerializer);
 		writer.complete();
 
 		assertTestGenericObjectRow(row, genericSerializer);
@@ -670,12 +673,30 @@ public class BinaryRowTest {
 		LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime);
 
 		writer.writeInt(0, 0);
-		writer.writeGeneric(1, new BinaryGeneric<>(new Date(123), SqlDateSerializer.INSTANCE));
-		writer.writeGeneric(2, new BinaryGeneric<>(new Time(456), SqlTimeSerializer.INSTANCE));
-		writer.writeGeneric(3, new BinaryGeneric<>(new Timestamp(789), SqlTimestampSerializer.INSTANCE));
-		writer.writeGeneric(4, new BinaryGeneric<>(localDate, LocalDateSerializer.INSTANCE));
-		writer.writeGeneric(5, new BinaryGeneric<>(localTime, LocalTimeSerializer.INSTANCE));
-		writer.writeGeneric(6, new BinaryGeneric<>(localDateTime, LocalDateTimeSerializer.INSTANCE));
+		writer.writeGeneric(
+			1,
+			new BinaryGeneric<>(new Date(123)),
+			new BinaryGenericSerializer<>(SqlDateSerializer.INSTANCE));
+		writer.writeGeneric(
+			2,
+			new BinaryGeneric<>(new Time(456)),
+			new BinaryGenericSerializer<>(SqlTimeSerializer.INSTANCE));
+		writer.writeGeneric(
+			3,
+			new BinaryGeneric<>(new Timestamp(789)),
+			new BinaryGenericSerializer<>(SqlTimestampSerializer.INSTANCE));
+		writer.writeGeneric(
+			4,
+			new BinaryGeneric<>(localDate),
+			new BinaryGenericSerializer<>(LocalDateSerializer.INSTANCE));
+		writer.writeGeneric(
+			5,
+			new BinaryGeneric<>(localTime),
+			new BinaryGenericSerializer<>(LocalTimeSerializer.INSTANCE));
+		writer.writeGeneric(
+			6,
+			new BinaryGeneric<>(localDateTime),
+			new BinaryGenericSerializer<>(LocalDateTimeSerializer.INSTANCE));
 		writer.complete();
 
 		assertEquals(new Date(123), BinaryGeneric.getJavaObjectFromBinaryGeneric(
@@ -781,7 +802,7 @@ public class BinaryRowTest {
 		random.nextBytes(bytes);
 		writer.writeBinary(0, bytes);
 		writer.reset();
-		writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), genericSerializer));
+		writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1)), new BinaryGenericSerializer<>(genericSerializer));
 		writer.complete();
 		int hash1 = row.hashCode();
 
@@ -789,7 +810,7 @@ public class BinaryRowTest {
 		random.nextBytes(bytes);
 		writer.writeBinary(0, bytes);
 		writer.reset();
-		writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), genericSerializer));
+		writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1)), new BinaryGenericSerializer<>(genericSerializer));
 		writer.complete();
 		int hash2 = row.hashCode();
 
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
index 015c7ff..c6a7649 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
@@ -54,6 +54,7 @@ import static org.apache.flink.table.dataformat.BinaryStringUtil.trimRight;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
@@ -596,25 +597,25 @@ public class BinaryStringTest {
 	@Test
 	public void testEmptyString() {
 		BinaryString str2 = fromString("hahahahah");
-		BinaryString str3 = new BinaryString();
+		BinaryString str3;
 		{
 			MemorySegment[] segments = new MemorySegment[2];
 			segments[0] = MemorySegmentFactory.wrap(new byte[10]);
 			segments[1] = MemorySegmentFactory.wrap(new byte[10]);
-			str3.pointTo(segments, 15, 0);
+			str3 = BinaryString.fromAddress(segments, 15, 0);
 		}
 
 		assertTrue(BinaryString.EMPTY_UTF8.compareTo(str2) < 0);
 		assertTrue(str2.compareTo(BinaryString.EMPTY_UTF8) > 0);
 
-		assertTrue(BinaryString.EMPTY_UTF8.compareTo(str3) == 0);
-		assertTrue(str3.compareTo(BinaryString.EMPTY_UTF8) == 0);
+		assertEquals(0, BinaryString.EMPTY_UTF8.compareTo(str3));
+		assertEquals(0, str3.compareTo(BinaryString.EMPTY_UTF8));
 
-		assertFalse(BinaryString.EMPTY_UTF8.equals(str2));
-		assertFalse(str2.equals(BinaryString.EMPTY_UTF8));
+		assertNotEquals(BinaryString.EMPTY_UTF8, str2);
+		assertNotEquals(str2, BinaryString.EMPTY_UTF8);
 
-		assertTrue(BinaryString.EMPTY_UTF8.equals(str3));
-		assertTrue(str3.equals(BinaryString.EMPTY_UTF8));
+		assertEquals(BinaryString.EMPTY_UTF8, str3);
+		assertEquals(str3, BinaryString.EMPTY_UTF8);
 	}
 
 	@Test
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
index 2ba0237..23e3713 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryStringSerializer;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import org.junit.Test;
@@ -137,7 +139,7 @@ public class NestedRowTest {
 		gRow.setField(1, 5L);
 		gRow.setField(2, BinaryString.fromString("12345678"));
 		gRow.setField(3, null);
-		gRow.setField(4, new BinaryGeneric<>(new MyObj(15, 5), genericSerializer));
+		gRow.setField(4, new BinaryGeneric<>(new MyObj(15, 5)));
 
 		BaseRowSerializer serializer = new BaseRowSerializer(
 			new LogicalType[]{
@@ -150,9 +152,9 @@ public class NestedRowTest {
 			new TypeSerializer[]{
 				IntSerializer.INSTANCE,
 				LongSerializer.INSTANCE,
-				StringSerializer.INSTANCE,
-				StringSerializer.INSTANCE,
-				genericSerializer
+				BinaryStringSerializer.INSTANCE,
+				BinaryStringSerializer.INSTANCE,
+				new BinaryGenericSerializer<>(genericSerializer)
 			});
 		writer.writeRow(0, gRow, serializer);
 		writer.complete();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseArraySerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseArraySerializerTest.java
index 100aef0..35bbc69 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseArraySerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseArraySerializerTest.java
@@ -84,7 +84,7 @@ public class BaseArraySerializerTest extends SerializerTestBase<BaseArray> {
 
 		MyObj inputObj = new MyObj(114514, 1919810);
 		BaseArray inputArray = new GenericArray(new BinaryGeneric[] {
-			new BinaryGeneric<>(inputObj, new KryoSerializer<>(MyObj.class, config))
+			new BinaryGeneric<>(inputObj)
 		}, 1);
 
 		byte[] serialized;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseMapSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseMapSerializerTest.java
index afd1cf4..02c0b67 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseMapSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseMapSerializerTest.java
@@ -82,7 +82,7 @@ public class BaseMapSerializerTest extends SerializerTestBase<BaseMap> {
 		int inputKey = 998244353;
 		MyObj inputObj = new MyObj(114514, 1919810);
 		Map<Object, Object> javaMap = new HashMap<>();
-		javaMap.put(inputKey, new BinaryGeneric<>(inputObj, new KryoSerializer<>(MyObj.class, config)));
+		javaMap.put(inputKey, new BinaryGeneric<>(inputObj));
 		BaseMap inputMap = new GenericMap(javaMap);
 
 		byte[] serialized;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializerTest.java
index d472392..f35227c 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializerTest.java
@@ -21,11 +21,22 @@ package org.apache.flink.table.runtime.typeutils;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.table.dataformat.BinaryGeneric;
+import org.apache.flink.table.utils.BinaryGenericAsserter;
+import org.apache.flink.testutils.DeeplyEqualsChecker;
 
 /**
  * A test for the {@link BinaryGenericSerializer}.
  */
 public class BinaryGenericSerializerTest extends SerializerTestBase<BinaryGeneric<String>> {
+	public BinaryGenericSerializerTest() {
+		super(new DeeplyEqualsChecker()
+			.withCustomCheck(
+				(o, o2) -> o instanceof BinaryGeneric && o2 instanceof BinaryGeneric,
+				(o, o2, checker) -> BinaryGenericAsserter.equivalent(
+					(BinaryGeneric) o2,
+					new BinaryGenericSerializer<>(StringSerializer.INSTANCE)).matches(o)
+			));
+	}
 
 	@Override
 	protected BinaryGenericSerializer<String> createSerializer() {
@@ -45,11 +56,11 @@ public class BinaryGenericSerializerTest extends SerializerTestBase<BinaryGeneri
 	@Override
 	protected BinaryGeneric[] getTestData() {
 		return new BinaryGeneric[] {
-				new BinaryGeneric<>("1", StringSerializer.INSTANCE),
-				new BinaryGeneric<>("2", StringSerializer.INSTANCE),
-				new BinaryGeneric<>("3", StringSerializer.INSTANCE),
-				new BinaryGeneric<>("4", StringSerializer.INSTANCE),
-				new BinaryGeneric<>("5", StringSerializer.INSTANCE)
+				new BinaryGeneric<>("1"),
+				new BinaryGeneric<>("2"),
+				new BinaryGeneric<>("3"),
+				new BinaryGeneric<>("4"),
+				new BinaryGeneric<>("5")
 		};
 	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/utils/BinaryGenericAsserter.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/utils/BinaryGenericAsserter.java
new file mode 100644
index 0000000..98af4a9
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/utils/BinaryGenericAsserter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.table.dataformat.BinaryGeneric;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
+import org.apache.flink.table.runtime.util.SegmentsUtil;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+import java.util.Arrays;
+
+/**
+ * A {@link org.hamcrest.Matcher} that allows equality check on {@link BinaryGeneric}s.
+ */
+public class BinaryGenericAsserter extends TypeSafeMatcher<BinaryGeneric> {
+	private final BinaryGeneric expected;
+	private final BinaryGenericSerializer serializer;
+
+	private BinaryGenericAsserter(
+			BinaryGeneric expected,
+			BinaryGenericSerializer serializer) {
+		this.expected = expected;
+		this.serializer = serializer;
+	}
+
+	/**
+	 * Checks that the {@link BinaryGeneric} is equivalent to the expected one. The serializer will be used
+	 * to ensure both objects are materialized into the binary form.
+	 *
+	 * @param expected the expected object
+	 * @param serializer serializer used to materialize the underlying java object
+	 * @return binary equality matcher
+	 */
+	@SuppressWarnings("unchecked")
+	public static BinaryGenericAsserter equivalent(BinaryGeneric expected, BinaryGenericSerializer serializer) {
+		expected.ensureMaterialized(serializer.getInnerSerializer());
+		return new BinaryGenericAsserter(expected, serializer);
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	protected boolean matchesSafely(BinaryGeneric item) {
+		item.ensureMaterialized(serializer.getInnerSerializer());
+		expected.ensureMaterialized(serializer.getInnerSerializer());
+
+		return item.getSizeInBytes() == expected.getSizeInBytes() &&
+			SegmentsUtil.equals(
+				item.getSegments(),
+				item.getOffset(),
+				expected.getSegments(),
+				expected.getOffset(),
+				item.getSizeInBytes());
+	}
+
+	@Override
+	public void describeTo(Description description) {
+		byte[] bytes = SegmentsUtil.getBytes(expected.getSegments(), expected.getOffset(), expected.getSizeInBytes());
+		description.appendText(Arrays.toString(bytes));
+	}
+}