You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 02:10:55 UTC

[flink] 03/03: [FLINK-13323][table-runtime-blink] Add tests for complex data formats

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f4ff4e775b13cac48ee3a369d4eff2368a49de7
Author: TsReaper <ts...@gmail.com>
AuthorDate: Thu Jul 18 19:55:48 2019 +0800

    [FLINK-13323][table-runtime-blink] Add tests for complex data formats
---
 .../flink/table/dataformat/BinaryRowTest.java      | 539 ++++++++++++++++++++-
 .../flink/table/dataformat/DataFormatTestUtil.java |  68 ++-
 .../apache/flink/table/dataformat/DecimalTest.java |   8 +
 .../apache/flink/table/util/SegmentsUtilTest.java  | 149 +++++-
 4 files changed, 751 insertions(+), 13 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
index b4a6387..6351e78 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -18,31 +18,60 @@
 
 package org.apache.flink.table.dataformat;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlDateSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.RandomAccessInputView;
 import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
 import org.apache.flink.table.typeutils.BaseRowSerializer;
 import org.apache.flink.table.typeutils.BinaryRowSerializer;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
 import static org.apache.flink.table.dataformat.BinaryString.fromBytes;
 import static org.apache.flink.table.dataformat.BinaryString.fromString;
+import static org.apache.flink.table.dataformat.DataFormatTestUtil.MyObj;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -61,7 +90,7 @@ public class BinaryRowTest {
 		MemorySegment segment = MemorySegmentFactory.wrap(new byte[100]);
 		BinaryRow row = new BinaryRow(2);
 		row.pointTo(segment, 10, 48);
-		assertTrue(row.getSegments()[0] == segment);
+		assertSame(row.getSegments()[0], segment);
 		row.setInt(0, 5);
 		row.setDouble(1, 5.8D);
 	}
@@ -85,7 +114,7 @@ public class BinaryRowTest {
 		assertTrue(row.isNullAt(0));
 		assertEquals(55, row.getShort(5));
 		assertEquals(22, row.getLong(2));
-		assertEquals(true, row.getBoolean(4));
+		assertTrue(row.getBoolean(4));
 		assertEquals((byte) 66, row.getByte(6));
 		assertEquals(77f, row.getFloat(7), 0);
 	}
@@ -131,7 +160,7 @@ public class BinaryRowTest {
 	}
 
 	@Test
-	public void testWriteString() throws IOException {
+	public void testWriteString() {
 		{
 			// litter byte[]
 			BinaryRow row = new BinaryRow(1);
@@ -242,7 +271,7 @@ public class BinaryRowTest {
 		assertEquals((byte) 99, row.getByte(2));
 		assertEquals(87.1d, row.getDouble(6), 0);
 		assertEquals(26.1f, row.getFloat(7), 0);
-		assertEquals(true, row.getBoolean(1));
+		assertTrue(row.getBoolean(1));
 		assertEquals("1234567", row.getString(3).toString());
 		assertEquals("12345678", row.getString(5).toString());
 		assertEquals("啦啦啦啦啦我是快乐的粉刷匠", row.getString(9).toString());
@@ -269,7 +298,7 @@ public class BinaryRowTest {
 	}
 
 	@Test
-	public void anyNullTest() throws IOException {
+	public void anyNullTest() {
 		{
 			BinaryRow row = new BinaryRow(3);
 			BinaryRowWriter writer = new BinaryRowWriter(row);
@@ -302,7 +331,7 @@ public class BinaryRowTest {
 	}
 
 	@Test
-	public void testSingleSegmentBinaryRowHashCode() throws IOException {
+	public void testSingleSegmentBinaryRowHashCode() {
 		final Random rnd = new Random(System.currentTimeMillis());
 		// test hash stabilization
 		BinaryRow row = new BinaryRow(13);
@@ -347,7 +376,7 @@ public class BinaryRowTest {
 	}
 
 	@Test
-	public void testHeaderSize() throws IOException {
+	public void testHeaderSize() {
 		assertEquals(8, BinaryRow.calculateBitSetWidthInBytes(56));
 		assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(57));
 		assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(120));
@@ -355,7 +384,7 @@ public class BinaryRowTest {
 	}
 
 	@Test
-	public void testHeader() throws IOException {
+	public void testHeader() {
 		BinaryRow row = new BinaryRow(2);
 		BinaryRowWriter writer = new BinaryRowWriter(row);
 
@@ -456,4 +485,498 @@ public class BinaryRowTest {
 		Assert.assertArrayEquals(bytes1, row.getBinary(0));
 		Assert.assertArrayEquals(bytes2, row.getBinary(1));
 	}
+
+	@Test
+	public void testBinaryArray() {
+		// 1. array test
+		BinaryArray array = new BinaryArray();
+		BinaryArrayWriter arrayWriter = new BinaryArrayWriter(
+			array, 3, BinaryArray.calculateFixLengthPartSize(DataTypes.INT().getLogicalType()));
+
+		arrayWriter.writeInt(0, 6);
+		arrayWriter.setNullInt(1);
+		arrayWriter.writeInt(2, 666);
+		arrayWriter.complete();
+
+		assertEquals(array.getInt(0), 6);
+		assertTrue(array.isNullAt(1));
+		assertEquals(array.getInt(2), 666);
+
+		// 2. test write array to binary row
+		BinaryRow row = new BinaryRow(1);
+		BinaryRowWriter rowWriter = new BinaryRowWriter(row);
+		BaseArraySerializer serializer = new BaseArraySerializer(
+			DataTypes.INT().getLogicalType(), new ExecutionConfig());
+		rowWriter.writeArray(0, array, serializer);
+		rowWriter.complete();
+
+		BinaryArray array2 = (BinaryArray) row.getArray(0);
+		assertEquals(array, array2);
+		assertEquals(6, array2.getInt(0));
+		assertTrue(array2.isNullAt(1));
+		assertEquals(666, array2.getInt(2));
+	}
+
+	@Test
+	public void testGenericArray() {
+		// 1. array test
+		Integer[] javaArray = {6, null, 666};
+		GenericArray array = new GenericArray(javaArray, 3, false);
+
+		assertEquals(array.getInt(0), 6);
+		assertTrue(array.isNullAt(1));
+		assertEquals(array.getInt(2), 666);
+
+		// 2. test write array to binary row
+		BinaryRow row2 = new BinaryRow(1);
+		BinaryRowWriter writer2 = new BinaryRowWriter(row2);
+		BaseArraySerializer serializer = new BaseArraySerializer(
+			DataTypes.INT().getLogicalType(), new ExecutionConfig());
+		writer2.writeArray(0, array, serializer);
+		writer2.complete();
+
+		BaseArray array2 = row2.getArray(0);
+		assertEquals(6, array2.getInt(0));
+		assertTrue(array2.isNullAt(1));
+		assertEquals(666, array2.getInt(2));
+	}
+
+	@Test
+	public void testBinaryMap() {
+		BinaryArray array1 = new BinaryArray();
+		BinaryArrayWriter writer1 = new BinaryArrayWriter(
+			array1, 4, BinaryArray.calculateFixLengthPartSize(DataTypes.INT().getLogicalType()));
+		writer1.writeInt(0, 6);
+		writer1.writeInt(1, 5);
+		writer1.writeInt(2, 666);
+		writer1.writeInt(3, 0);
+		writer1.complete();
+
+		BinaryArray array2 = new BinaryArray();
+		BinaryArrayWriter writer2 = new BinaryArrayWriter(
+			array2, 4, BinaryArray.calculateFixLengthPartSize(DataTypes.STRING().getLogicalType()));
+		writer2.writeString(0, BinaryString.fromString("6"));
+		writer2.writeString(1, BinaryString.fromString("5"));
+		writer2.writeString(2, BinaryString.fromString("666"));
+		writer2.setNullAt(3, DataTypes.STRING().getLogicalType());
+		writer2.complete();
+
+		BinaryMap binaryMap = BinaryMap.valueOf(array1, array2);
+
+		BinaryRow row = new BinaryRow(1);
+		BinaryRowWriter rowWriter = new BinaryRowWriter(row);
+		BaseMapSerializer serializer = new BaseMapSerializer(
+			DataTypes.STRING().getLogicalType(),
+			DataTypes.INT().getLogicalType(),
+			new ExecutionConfig());
+		rowWriter.writeMap(0, binaryMap, serializer);
+		rowWriter.complete();
+
+		BinaryMap map = (BinaryMap) row.getMap(0);
+		BinaryArray key = map.keyArray();
+		BinaryArray value = map.valueArray();
+
+		assertEquals(binaryMap, map);
+		assertEquals(array1, key);
+		assertEquals(array2, value);
+
+		assertEquals(5, key.getInt(1));
+		assertEquals(BinaryString.fromString("5"), value.getString(1));
+		assertEquals(0, key.getInt(3));
+		assertTrue(value.isNullAt(3));
+	}
+
+	@Test
+	public void testGenericMap() {
+		Map javaMap = new HashMap();
+		javaMap.put(6, BinaryString.fromString("6"));
+		javaMap.put(5, BinaryString.fromString("5"));
+		javaMap.put(666, BinaryString.fromString("666"));
+		javaMap.put(0, null);
+
+		GenericMap genericMap = new GenericMap(javaMap);
+
+		BinaryRow row = new BinaryRow(1);
+		BinaryRowWriter rowWriter = new BinaryRowWriter(row);
+		BaseMapSerializer serializer = new BaseMapSerializer(
+			DataTypes.INT().getLogicalType(),
+			DataTypes.STRING().getLogicalType(),
+			new ExecutionConfig());
+		rowWriter.writeMap(0, genericMap, serializer);
+		rowWriter.complete();
+
+		Map map = row.getMap(0).toJavaMap(DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType());
+		assertEquals(BinaryString.fromString("6"), map.get(6));
+		assertEquals(BinaryString.fromString("5"), map.get(5));
+		assertEquals(BinaryString.fromString("666"), map.get(666));
+		assertTrue(map.containsKey(0));
+		assertNull(map.get(0));
+	}
+
+	@Test
+	public void testGenericObject() throws Exception {
+
+		GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
+		TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+
+		BinaryRow row = new BinaryRow(4);
+		BinaryRowWriter writer = new BinaryRowWriter(row);
+		writer.writeInt(0, 0);
+
+		BinaryGeneric<MyObj> myObj1 = new BinaryGeneric<>(new MyObj(0, 1), genericSerializer);
+		writer.writeGeneric(1, myObj1);
+		BinaryGeneric<MyObj> myObj2 = new BinaryGeneric<>(new MyObj(123, 5.0), genericSerializer);
+		myObj2.ensureMaterialized();
+		writer.writeGeneric(2, myObj2);
+		BinaryGeneric<MyObj> myObj3 = new BinaryGeneric<>(new MyObj(1, 1), genericSerializer);
+		writer.writeGeneric(3, myObj3);
+		writer.complete();
+
+		assertTestGenericObjectRow(row, genericSerializer);
+
+		// getBytes from var-length memorySegments.
+		BinaryRowSerializer serializer = new BinaryRowSerializer(4);
+		MemorySegment[] memorySegments = new MemorySegment[3];
+		ArrayList<MemorySegment> memorySegmentList = new ArrayList<>();
+		for (int i = 0; i < 3; i++) {
+			memorySegments[i] = MemorySegmentFactory.wrap(new byte[64]);
+			memorySegmentList.add(memorySegments[i]);
+		}
+		RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64);
+		serializer.serializeToPages(row, out);
+
+		BinaryRow mapRow = serializer.mapFromPages(new RandomAccessInputView(memorySegmentList, 64));
+		assertTestGenericObjectRow(mapRow, genericSerializer);
+	}
+
+	private void assertTestGenericObjectRow(BinaryRow row, TypeSerializer<MyObj> serializer) {
+		assertEquals(0, row.getInt(0));
+		BinaryGeneric<MyObj> binaryGeneric1 = row.getGeneric(1);
+		BinaryGeneric<MyObj> binaryGeneric2 = row.getGeneric(2);
+		BinaryGeneric<MyObj> binaryGeneric3 = row.getGeneric(3);
+		assertEquals(new MyObj(0, 1), BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric1, serializer));
+		assertEquals(new MyObj(123, 5.0), BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric2, serializer));
+		assertEquals(new MyObj(1, 1), BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric3, serializer));
+	}
+
+	@Test
+	public void testDateAndTimeAsGenericObject() {
+		BinaryRow row = new BinaryRow(7);
+		BinaryRowWriter writer = new BinaryRowWriter(row);
+
+		LocalDate localDate = LocalDate.of(2019, 7, 16);
+		LocalTime localTime = LocalTime.of(17, 31);
+		LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime);
+
+		writer.writeInt(0, 0);
+		writer.writeGeneric(1, new BinaryGeneric<>(new Date(123), SqlDateSerializer.INSTANCE));
+		writer.writeGeneric(2, new BinaryGeneric<>(new Time(456), SqlTimeSerializer.INSTANCE));
+		writer.writeGeneric(3, new BinaryGeneric<>(new Timestamp(789), SqlTimestampSerializer.INSTANCE));
+		writer.writeGeneric(4, new BinaryGeneric<>(localDate, LocalDateSerializer.INSTANCE));
+		writer.writeGeneric(5, new BinaryGeneric<>(localTime, LocalTimeSerializer.INSTANCE));
+		writer.writeGeneric(6, new BinaryGeneric<>(localDateTime, LocalDateTimeSerializer.INSTANCE));
+		writer.complete();
+
+		assertEquals(new Date(123), BinaryGeneric.getJavaObjectFromBinaryGeneric(
+			row.getGeneric(1), SqlDateSerializer.INSTANCE));
+		assertEquals(new Time(456), BinaryGeneric.getJavaObjectFromBinaryGeneric(
+			row.getGeneric(2), SqlTimeSerializer.INSTANCE));
+		assertEquals(new Timestamp(789), BinaryGeneric.getJavaObjectFromBinaryGeneric(
+			row.getGeneric(3), SqlTimestampSerializer.INSTANCE));
+		assertEquals(localDate, BinaryGeneric.getJavaObjectFromBinaryGeneric(
+			row.getGeneric(4), LocalDateSerializer.INSTANCE));
+		assertEquals(localTime, BinaryGeneric.getJavaObjectFromBinaryGeneric(
+			row.getGeneric(5), LocalTimeSerializer.INSTANCE));
+		assertEquals(localDateTime, BinaryGeneric.getJavaObjectFromBinaryGeneric(
+			row.getGeneric(6), LocalDateTimeSerializer.INSTANCE));
+	}
+
+	@Test
+	public void testSerializeVariousSize() throws IOException {
+		// in this test, we are going to start serializing from the i-th byte (i in 0...`segSize`)
+		// and the size of the row we're going to serialize is j bytes
+		// (j in `rowFixLength` to the maximum length we can write)
+
+		int segSize = 64;
+		int segTotalNumber = 3;
+
+		BinaryRow row = new BinaryRow(1);
+		BinaryRowWriter writer = new BinaryRowWriter(row);
+		Random random = new Random();
+		byte[] bytes = new byte[1024];
+		random.nextBytes(bytes);
+		writer.writeBinary(0, bytes);
+		writer.complete();
+
+		MemorySegment[] memorySegments = new MemorySegment[segTotalNumber];
+		Map<MemorySegment, Integer> msIndex = new HashMap<>();
+		for (int i = 0; i < segTotalNumber; i++) {
+			memorySegments[i] = MemorySegmentFactory.wrap(new byte[segSize]);
+			msIndex.put(memorySegments[i], i);
+		}
+
+		BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+		int rowSizeInt = 4;
+		// note that as there is only one field in the row, the fixed-length part is 16 bytes (header + 1 field)
+		int rowFixLength = 16;
+		for (int i = 0; i < segSize; i++) {
+			// this is the maximum row size we can serialize
+			// if we are going to serialize from the i-th byte of the input view
+			int maxRowSize = (segSize * segTotalNumber) - i - rowSizeInt;
+			if (segSize - i < rowFixLength + rowSizeInt) {
+				// oops, we can't write the whole fixed-length part in the first segment
+				// because the remaining space is too small, so we have to start serializing from the second segment.
+				// when serializing, we need to first write the length of the row,
+				// then write the fixed-length part of the row.
+				maxRowSize -= segSize - i;
+			}
+			for (int j = rowFixLength; j < maxRowSize; j++) {
+				// ok, now we're going to serialize a row of j bytes
+				testSerialize(row, memorySegments, msIndex, serializer, i, j);
+			}
+		}
+	}
+
+	private void testSerialize(
+		BinaryRow row, MemorySegment[] memorySegments,
+		Map<MemorySegment, Integer> msIndex, BinaryRowSerializer serializer, int position,
+		int rowSize) throws IOException {
+		RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64);
+		out.skipBytesToWrite(position);
+		row.setTotalSize(rowSize);
+
+		// this `row` contains random bytes, and now we're going to serialize `rowSize` bytes
+		// (not including the row header) of the contents
+		serializer.serializeToPages(row, out);
+
+		// let's see how many segments we have written
+		int segNumber = msIndex.get(out.getCurrentSegment()) + 1;
+		int lastSegSize = out.getCurrentPositionInSegment();
+
+		// now deserialize from the written segments
+		ArrayList<MemorySegment> segments = new ArrayList<>(Arrays.asList(memorySegments).subList(0, segNumber));
+		RandomAccessInputView input = new RandomAccessInputView(segments, 64, lastSegSize);
+		input.skipBytesToRead(position);
+		BinaryRow mapRow = serializer.mapFromPages(input);
+
+		assertEquals(row, mapRow);
+	}
+
+	@Test
+	public void testZeroOutPaddingGeneric() {
+
+		GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
+		TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+
+		Random random = new Random();
+		byte[] bytes = new byte[1024];
+
+		BinaryRow row = new BinaryRow(1);
+		BinaryRowWriter writer = new BinaryRowWriter(row);
+
+		// let's random the bytes
+		writer.reset();
+		random.nextBytes(bytes);
+		writer.writeBinary(0, bytes);
+		writer.reset();
+		writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), genericSerializer));
+		writer.complete();
+		int hash1 = row.hashCode();
+
+		writer.reset();
+		random.nextBytes(bytes);
+		writer.writeBinary(0, bytes);
+		writer.reset();
+		writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), genericSerializer));
+		writer.complete();
+		int hash2 = row.hashCode();
+
+		assertEquals(hash1, hash2);
+	}
+
+	@Test
+	public void testZeroOutPaddingString() {
+
+		Random random = new Random();
+		byte[] bytes = new byte[1024];
+
+		BinaryRow row = new BinaryRow(1);
+		BinaryRowWriter writer = new BinaryRowWriter(row);
+
+		writer.reset();
+		random.nextBytes(bytes);
+		writer.writeBinary(0, bytes);
+		writer.reset();
+		writer.writeString(0, BinaryString.fromString("wahahah"));
+		writer.complete();
+		int hash1 = row.hashCode();
+
+		writer.reset();
+		random.nextBytes(bytes);
+		writer.writeBinary(0, bytes);
+		writer.reset();
+		writer.writeString(0, BinaryString.fromString("wahahah"));
+		writer.complete();
+		int hash2 = row.hashCode();
+
+		assertEquals(hash1, hash2);
+	}
+
+	@Test
+	public void testHashAndCopy() throws IOException {
+		MemorySegment[] segments = new MemorySegment[3];
+		for (int i = 0; i < 3; i++) {
+			segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+		}
+		RandomAccessOutputView out = new RandomAccessOutputView(segments, 64);
+		BinaryRowSerializer serializer = new BinaryRowSerializer(2);
+
+		BinaryRow row = new BinaryRow(2);
+		BinaryRowWriter writer = new BinaryRowWriter(row);
+		writer.writeString(0, BinaryString.fromString("hahahahahahahahahahahahahahahahahahahhahahahahahahahahah"));
+		writer.writeString(1, BinaryString.fromString("hahahahahahahahahahahahahahahahahahahhahahahahahahahahaa"));
+		writer.complete();
+		serializer.serializeToPages(row, out);
+
+		ArrayList<MemorySegment> segmentList = new ArrayList<>(Arrays.asList(segments));
+		RandomAccessInputView input = new RandomAccessInputView(segmentList, 64, 64);
+
+		BinaryRow mapRow = serializer.mapFromPages(input);
+		assertEquals(row, mapRow);
+		assertEquals(row.getString(0), mapRow.getString(0));
+		assertEquals(row.getString(1), mapRow.getString(1));
+		assertNotEquals(row.getString(0), mapRow.getString(1));
+
+		// test if the hash code before and after serialization are the same
+		assertEquals(row.hashCode(), mapRow.hashCode());
+		assertEquals(row.getString(0).hashCode(), mapRow.getString(0).hashCode());
+		assertEquals(row.getString(1).hashCode(), mapRow.getString(1).hashCode());
+
+		// test if the copy method produce a row with the same contents
+		assertEquals(row.copy(), mapRow.copy());
+		assertEquals(row.getString(0).copy(), mapRow.getString(0).copy());
+		assertEquals(row.getString(1).copy(), mapRow.getString(1).copy());
+	}
+
+	@Test
+	public void testSerStringToKryo() throws IOException {
+		KryoSerializer<BinaryString> serializer = new KryoSerializer<>(
+			BinaryString.class, new ExecutionConfig());
+
+		BinaryString string = BinaryString.fromString("hahahahaha");
+		RandomAccessOutputView out = new RandomAccessOutputView(
+			new MemorySegment[]{MemorySegmentFactory.wrap(new byte[1024])}, 64);
+		serializer.serialize(string, out);
+
+		RandomAccessInputView input = new RandomAccessInputView(
+			new ArrayList<>(Collections.singletonList(out.getCurrentSegment())), 64, 64);
+		BinaryString newStr = serializer.deserialize(input);
+
+		assertEquals(string, newStr);
+	}
+
+	@Test
+	public void testSerializerPages() throws IOException {
+		// Boundary tests
+		BinaryRow row24 = DataFormatTestUtil.get24BytesBinaryRow();
+		BinaryRow row160 = DataFormatTestUtil.get160BytesBinaryRow();
+		testSerializerPagesInternal(row24, row160);
+		testSerializerPagesInternal(row24, DataFormatTestUtil.getMultiSeg160BytesBinaryRow(row160));
+	}
+
+	private void testSerializerPagesInternal(BinaryRow row24, BinaryRow row160) throws IOException {
+		BinaryRowSerializer serializer = new BinaryRowSerializer(2);
+
+		// 1. test middle row with just on the edge1
+		{
+			MemorySegment[] segments = new MemorySegment[4];
+			for (int i = 0; i < segments.length; i++) {
+				segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+			}
+			RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size());
+			serializer.serializeToPages(row24, out);
+			serializer.serializeToPages(row160, out);
+			serializer.serializeToPages(row24, out);
+
+			RandomAccessInputView in = new RandomAccessInputView(
+				new ArrayList<>(Arrays.asList(segments)),
+				segments[0].size(),
+				out.getCurrentPositionInSegment());
+
+			BinaryRow retRow = new BinaryRow(2);
+			List<BinaryRow> rets = new ArrayList<>();
+			while (true) {
+				try {
+					retRow = serializer.mapFromPages(retRow, in);
+				} catch (EOFException e) {
+					break;
+				}
+				rets.add(retRow.copy());
+			}
+			assertEquals(row24, rets.get(0));
+			assertEquals(row160, rets.get(1));
+			assertEquals(row24, rets.get(2));
+		}
+
+		// 2. test middle row with just on the edge2
+		{
+			MemorySegment[] segments = new MemorySegment[7];
+			for (int i = 0; i < segments.length; i++) {
+				segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+			}
+			RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size());
+			serializer.serializeToPages(row24, out);
+			serializer.serializeToPages(row160, out);
+			serializer.serializeToPages(row160, out);
+
+			RandomAccessInputView in = new RandomAccessInputView(
+				new ArrayList<>(Arrays.asList(segments)),
+				segments[0].size(),
+				out.getCurrentPositionInSegment());
+
+			BinaryRow retRow = new BinaryRow(2);
+			List<BinaryRow> rets = new ArrayList<>();
+			while (true) {
+				try {
+					retRow = serializer.mapFromPages(retRow, in);
+				} catch (EOFException e) {
+					break;
+				}
+				rets.add(retRow.copy());
+			}
+			assertEquals(row24, rets.get(0));
+			assertEquals(row160, rets.get(1));
+			assertEquals(row160, rets.get(2));
+		}
+
+		// 3. test last row with just on the edge
+		{
+			MemorySegment[] segments = new MemorySegment[3];
+			for (int i = 0; i < segments.length; i++) {
+				segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+			}
+			RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size());
+			serializer.serializeToPages(row24, out);
+			serializer.serializeToPages(row160, out);
+
+			RandomAccessInputView in = new RandomAccessInputView(
+				new ArrayList<>(Arrays.asList(segments)),
+				segments[0].size(),
+				out.getCurrentPositionInSegment());
+
+			BinaryRow retRow = new BinaryRow(2);
+			List<BinaryRow> rets = new ArrayList<>();
+			while (true) {
+				try {
+					retRow = serializer.mapFromPages(retRow, in);
+				} catch (EOFException e) {
+					break;
+				}
+				rets.add(retRow.copy());
+			}
+			assertEquals(row24, rets.get(0));
+			assertEquals(row160, rets.get(1));
+		}
+	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
index d89fa5b..11d3f11 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
@@ -20,10 +20,76 @@ package org.apache.flink.table.dataformat;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 
+import org.apache.commons.lang3.RandomStringUtils;
+
+import static org.junit.Assert.assertEquals;
+
 /**
  * Utils for testing data formats.
  */
-class DataFormatTestUtil {
+public class DataFormatTestUtil {
+
+	/**
+	 * Get a binary row of 24 bytes long.
+	 */
+	public static BinaryRow get24BytesBinaryRow() {
+		// header (8 bytes) + 2 * string in fixed-length part (8 bytes each)
+		BinaryRow row = new BinaryRow(2);
+		BinaryRowWriter writer = new BinaryRowWriter(row);
+		writer.writeString(0, BinaryString.fromString(RandomStringUtils.randomNumeric(2)));
+		writer.writeString(1, BinaryString.fromString(RandomStringUtils.randomNumeric(2)));
+		writer.complete();
+		return row;
+	}
+
+	/**
+	 * Get a binary row of 160 bytes long.
+	 */
+	public static BinaryRow get160BytesBinaryRow() {
+		// header (8 bytes) +
+		// 72 byte length string (8 bytes in fixed-length, 72 bytes in variable-length) +
+		// 64 byte length string (8 bytes in fixed-length, 64 bytes in variable-length)
+		BinaryRow row = new BinaryRow(2);
+		BinaryRowWriter writer = new BinaryRowWriter(row);
+		writer.writeString(0, BinaryString.fromString(RandomStringUtils.randomNumeric(72)));
+		writer.writeString(1, BinaryString.fromString(RandomStringUtils.randomNumeric(64)));
+		writer.complete();
+		return row;
+	}
+
+	/**
+	 * Get a binary row consisting of 6 segments.
+	 * The bytes of the returned row is the same with the given input binary row.
+	 */
+	public static BinaryRow getMultiSeg160BytesBinaryRow(BinaryRow row160) {
+		BinaryRow multiSegRow160 = new BinaryRow(2);
+		MemorySegment[] segments = new MemorySegment[6];
+		int baseOffset = 8;
+		int posInSeg = baseOffset;
+		int remainSize = 160;
+		for (int i = 0; i < segments.length; i++) {
+			segments[i] = MemorySegmentFactory.wrap(new byte[32]);
+			int copy = Math.min(32 - posInSeg, remainSize);
+			row160.getSegments()[0].copyTo(160 - remainSize, segments[i], posInSeg, copy);
+			remainSize -= copy;
+			posInSeg = 0;
+		}
+		multiSegRow160.pointTo(segments, baseOffset, 160);
+		assertEquals(row160, multiSegRow160);
+		return multiSegRow160;
+	}
+
+	/**
+	 * Get a binary row consisting of 2 segments.
+	 * Its first segment is the same with the given input binary row, while its second segment is empty.
+	 */
+	public static BinaryRow getMultiSeg160BytesInOneSegRow(BinaryRow row160) {
+		MemorySegment[] segments = new MemorySegment[2];
+		segments[0] = row160.getSegments()[0];
+		segments[1] = MemorySegmentFactory.wrap(new byte[row160.getSegments()[0].size()]);
+		row160.pointTo(segments, 0, row160.getSizeInBytes());
+		return row160;
+	}
 
 	/**
 	 * Split the given byte array into two memory segments.
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java
index e19299c..e66351a 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java
@@ -114,4 +114,12 @@ public class DecimalTest {
 		Assert.assertEquals(0, Decimal.zero(20, 2).toBigDecimal().intValue());
 		Assert.assertEquals(0, Decimal.zero(20, 2).toBigDecimal().intValue());
 	}
+
+	@Test
+	public void testToString() {
+		String val = "0.0000000000000000001";
+		Assert.assertEquals(val, Decimal.castFrom(val, 39, val.length() - 2).toString());
+		val = "123456789012345678901234567890123456789";
+		Assert.assertEquals(val, Decimal.castFrom(val, 39, 0).toString());
+	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
index c1808bc..a8355f1 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
@@ -19,11 +19,20 @@ package org.apache.flink.table.util;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BinaryRow;
 import org.apache.flink.table.dataformat.BinaryRowTest;
+import org.apache.flink.table.dataformat.DataFormatTestUtil;
+import org.apache.flink.table.dataformat.util.BinaryRowUtil;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.flink.table.dataformat.util.BinaryRowUtil.BYTE_ARRAY_BASE_OFFSET;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Test for {@link SegmentsUtil}, most is covered by {@link BinaryRowTest},
  * this just test some boundary scenarios testing.
@@ -53,10 +62,142 @@ public class SegmentsUtilTest {
 		segments2[0] = MemorySegmentFactory.wrap(new byte[]{6, 0, 2, 5});
 		segments2[1] = MemorySegmentFactory.wrap(new byte[]{6, 12, 15, 18});
 
-		Assert.assertTrue(SegmentsUtil.equalsMultiSegments(segments1, 0, segments2, 0, 0));
-		Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 3));
-		Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 6));
-		Assert.assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 1, 7));
+		assertTrue(SegmentsUtil.equalsMultiSegments(segments1, 0, segments2, 0, 0));
+		assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 3));
+		assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 6));
+		assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 1, 7));
+	}
+
+	@Test
+	public void testBoundaryByteArrayEquals() {
+		byte[] bytes1 = new byte[5];
+		bytes1[3] = 81;
+		byte[] bytes2 = new byte[100];
+		bytes2[3] = 81;
+		bytes2[4] = 81;
+
+		assertTrue(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 4));
+		assertFalse(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 5));
+		assertTrue(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 0));
+	}
+
+	@Test
+	public void testBoundaryEquals() {
+		BinaryRow row24 = DataFormatTestUtil.get24BytesBinaryRow();
+		BinaryRow row160 = DataFormatTestUtil.get160BytesBinaryRow();
+		BinaryRow varRow160 = DataFormatTestUtil.getMultiSeg160BytesBinaryRow(row160);
+		BinaryRow varRow160InOne = DataFormatTestUtil.getMultiSeg160BytesInOneSegRow(row160);
+
+		assertEquals(row160, varRow160InOne);
+		assertEquals(varRow160, varRow160InOne);
+		assertEquals(row160, varRow160);
+		assertEquals(varRow160InOne, varRow160);
+
+		assertNotEquals(row24, row160);
+		assertNotEquals(row24, varRow160);
+		assertNotEquals(row24, varRow160InOne);
+
+		assertTrue(SegmentsUtil.equals(row24.getSegments(), 0, row160.getSegments(), 0, 0));
+		assertTrue(SegmentsUtil.equals(row24.getSegments(), 0, varRow160.getSegments(), 0, 0));
+
+		// test var segs
+		MemorySegment[] segments1 = new MemorySegment[2];
+		segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+		segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+		MemorySegment[] segments2 = new MemorySegment[3];
+		segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
+		segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
+		segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
+
+		segments1[0].put(9, (byte) 1);
+		assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 14, 14));
+		segments2[1].put(7, (byte) 1);
+		assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 14, 14));
+		assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 16, 14));
+		assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 16, 16));
+
+		segments2[2].put(7, (byte) 1);
+		assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 32, 14));
+	}
+
+	@Test
+	public void testBoundaryCopy() {
+		MemorySegment[] segments1 = new MemorySegment[2];
+		segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+		segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+		segments1[0].put(15, (byte) 5);
+		segments1[1].put(15, (byte) 6);
+
+		{
+			byte[] bytes = new byte[64];
+			MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+			SegmentsUtil.copyToBytes(segments1, 0, bytes, 0, 64);
+			assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 0, 64));
+		}
+
+		{
+			byte[] bytes = new byte[64];
+			MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+			SegmentsUtil.copyToBytes(segments1, 32, bytes, 0, 14);
+			assertTrue(SegmentsUtil.equals(segments1, 32, segments2, 0, 14));
+		}
+
+		{
+			byte[] bytes = new byte[64];
+			MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+			SegmentsUtil.copyToBytes(segments1, 34, bytes, 0, 14);
+			assertTrue(SegmentsUtil.equals(segments1, 34, segments2, 0, 14));
+		}
+	}
+
+	@Test
+	public void testCopyToUnsafe() {
+		MemorySegment[] segments1 = new MemorySegment[2];
+		segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+		segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+		segments1[0].put(15, (byte) 5);
+		segments1[1].put(15, (byte) 6);
+
+		{
+			byte[] bytes = new byte[64];
+			MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+			SegmentsUtil.copyToUnsafe(segments1, 0, bytes, BYTE_ARRAY_BASE_OFFSET, 64);
+			assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 0, 64));
+		}
+
+		{
+			byte[] bytes = new byte[64];
+			MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+			SegmentsUtil.copyToUnsafe(segments1, 32, bytes, BYTE_ARRAY_BASE_OFFSET, 14);
+			assertTrue(SegmentsUtil.equals(segments1, 32, segments2, 0, 14));
+		}
+
+		{
+			byte[] bytes = new byte[64];
+			MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+			SegmentsUtil.copyToUnsafe(segments1, 34, bytes, BYTE_ARRAY_BASE_OFFSET, 14);
+			assertTrue(SegmentsUtil.equals(segments1, 34, segments2, 0, 14));
+		}
+	}
+
+	@Test
+	public void testFind() {
+		MemorySegment[] segments1 = new MemorySegment[2];
+		segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+		segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+		MemorySegment[] segments2 = new MemorySegment[3];
+		segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
+		segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
+		segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
+
+		assertEquals(34, SegmentsUtil.find(segments1, 34, 0, segments2, 0, 0));
+		assertEquals(-1, SegmentsUtil.find(segments1, 34, 0, segments2, 0, 15));
 	}
 
 }