You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 02:10:55 UTC
[flink] 03/03: [FLINK-13323][table-runtime-blink] Add tests for
complex data formats
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2f4ff4e775b13cac48ee3a369d4eff2368a49de7
Author: TsReaper <ts...@gmail.com>
AuthorDate: Thu Jul 18 19:55:48 2019 +0800
[FLINK-13323][table-runtime-blink] Add tests for complex data formats
---
.../flink/table/dataformat/BinaryRowTest.java | 539 ++++++++++++++++++++-
.../flink/table/dataformat/DataFormatTestUtil.java | 68 ++-
.../apache/flink/table/dataformat/DecimalTest.java | 8 +
.../apache/flink/table/util/SegmentsUtilTest.java | 149 +++++-
4 files changed, 751 insertions(+), 13 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
index b4a6387..6351e78 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -18,31 +18,60 @@
package org.apache.flink.table.dataformat;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.LocalTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlDateSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimeSerializer;
+import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.typeutils.BaseArraySerializer;
+import org.apache.flink.table.typeutils.BaseMapSerializer;
import org.apache.flink.table.typeutils.BaseRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.junit.Assert;
import org.junit.Test;
+import java.io.EOFException;
import java.io.IOException;
import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import static org.apache.flink.table.dataformat.BinaryString.fromBytes;
import static org.apache.flink.table.dataformat.BinaryString.fromString;
+import static org.apache.flink.table.dataformat.DataFormatTestUtil.MyObj;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
/**
@@ -61,7 +90,7 @@ public class BinaryRowTest {
MemorySegment segment = MemorySegmentFactory.wrap(new byte[100]);
BinaryRow row = new BinaryRow(2);
row.pointTo(segment, 10, 48);
- assertTrue(row.getSegments()[0] == segment);
+ assertSame(row.getSegments()[0], segment);
row.setInt(0, 5);
row.setDouble(1, 5.8D);
}
@@ -85,7 +114,7 @@ public class BinaryRowTest {
assertTrue(row.isNullAt(0));
assertEquals(55, row.getShort(5));
assertEquals(22, row.getLong(2));
- assertEquals(true, row.getBoolean(4));
+ assertTrue(row.getBoolean(4));
assertEquals((byte) 66, row.getByte(6));
assertEquals(77f, row.getFloat(7), 0);
}
@@ -131,7 +160,7 @@ public class BinaryRowTest {
}
@Test
- public void testWriteString() throws IOException {
+ public void testWriteString() {
{
// litter byte[]
BinaryRow row = new BinaryRow(1);
@@ -242,7 +271,7 @@ public class BinaryRowTest {
assertEquals((byte) 99, row.getByte(2));
assertEquals(87.1d, row.getDouble(6), 0);
assertEquals(26.1f, row.getFloat(7), 0);
- assertEquals(true, row.getBoolean(1));
+ assertTrue(row.getBoolean(1));
assertEquals("1234567", row.getString(3).toString());
assertEquals("12345678", row.getString(5).toString());
assertEquals("啦啦啦啦啦我是快乐的粉刷匠", row.getString(9).toString());
@@ -269,7 +298,7 @@ public class BinaryRowTest {
}
@Test
- public void anyNullTest() throws IOException {
+ public void anyNullTest() {
{
BinaryRow row = new BinaryRow(3);
BinaryRowWriter writer = new BinaryRowWriter(row);
@@ -302,7 +331,7 @@ public class BinaryRowTest {
}
@Test
- public void testSingleSegmentBinaryRowHashCode() throws IOException {
+ public void testSingleSegmentBinaryRowHashCode() {
final Random rnd = new Random(System.currentTimeMillis());
// test hash stabilization
BinaryRow row = new BinaryRow(13);
@@ -347,7 +376,7 @@ public class BinaryRowTest {
}
@Test
- public void testHeaderSize() throws IOException {
+ public void testHeaderSize() {
assertEquals(8, BinaryRow.calculateBitSetWidthInBytes(56));
assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(57));
assertEquals(16, BinaryRow.calculateBitSetWidthInBytes(120));
@@ -355,7 +384,7 @@ public class BinaryRowTest {
}
@Test
- public void testHeader() throws IOException {
+ public void testHeader() {
BinaryRow row = new BinaryRow(2);
BinaryRowWriter writer = new BinaryRowWriter(row);
@@ -456,4 +485,498 @@ public class BinaryRowTest {
Assert.assertArrayEquals(bytes1, row.getBinary(0));
Assert.assertArrayEquals(bytes2, row.getBinary(1));
}
+
+ @Test
+ public void testBinaryArray() {
+ // 1. array test
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter arrayWriter = new BinaryArrayWriter(
+ array, 3, BinaryArray.calculateFixLengthPartSize(DataTypes.INT().getLogicalType()));
+
+ arrayWriter.writeInt(0, 6);
+ arrayWriter.setNullInt(1);
+ arrayWriter.writeInt(2, 666);
+ arrayWriter.complete();
+
+ assertEquals(array.getInt(0), 6);
+ assertTrue(array.isNullAt(1));
+ assertEquals(array.getInt(2), 666);
+
+ // 2. test write array to binary row
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter rowWriter = new BinaryRowWriter(row);
+ BaseArraySerializer serializer = new BaseArraySerializer(
+ DataTypes.INT().getLogicalType(), new ExecutionConfig());
+ rowWriter.writeArray(0, array, serializer);
+ rowWriter.complete();
+
+ BinaryArray array2 = (BinaryArray) row.getArray(0);
+ assertEquals(array, array2);
+ assertEquals(6, array2.getInt(0));
+ assertTrue(array2.isNullAt(1));
+ assertEquals(666, array2.getInt(2));
+ }
+
+ @Test
+ public void testGenericArray() {
+ // 1. array test
+ Integer[] javaArray = {6, null, 666};
+ GenericArray array = new GenericArray(javaArray, 3, false);
+
+ assertEquals(array.getInt(0), 6);
+ assertTrue(array.isNullAt(1));
+ assertEquals(array.getInt(2), 666);
+
+ // 2. test write array to binary row
+ BinaryRow row2 = new BinaryRow(1);
+ BinaryRowWriter writer2 = new BinaryRowWriter(row2);
+ BaseArraySerializer serializer = new BaseArraySerializer(
+ DataTypes.INT().getLogicalType(), new ExecutionConfig());
+ writer2.writeArray(0, array, serializer);
+ writer2.complete();
+
+ BaseArray array2 = row2.getArray(0);
+ assertEquals(6, array2.getInt(0));
+ assertTrue(array2.isNullAt(1));
+ assertEquals(666, array2.getInt(2));
+ }
+
+ @Test
+ public void testBinaryMap() {
+ BinaryArray array1 = new BinaryArray();
+ BinaryArrayWriter writer1 = new BinaryArrayWriter(
+ array1, 4, BinaryArray.calculateFixLengthPartSize(DataTypes.INT().getLogicalType()));
+ writer1.writeInt(0, 6);
+ writer1.writeInt(1, 5);
+ writer1.writeInt(2, 666);
+ writer1.writeInt(3, 0);
+ writer1.complete();
+
+ BinaryArray array2 = new BinaryArray();
+ BinaryArrayWriter writer2 = new BinaryArrayWriter(
+ array2, 4, BinaryArray.calculateFixLengthPartSize(DataTypes.STRING().getLogicalType()));
+ writer2.writeString(0, BinaryString.fromString("6"));
+ writer2.writeString(1, BinaryString.fromString("5"));
+ writer2.writeString(2, BinaryString.fromString("666"));
+ writer2.setNullAt(3, DataTypes.STRING().getLogicalType());
+ writer2.complete();
+
+ BinaryMap binaryMap = BinaryMap.valueOf(array1, array2);
+
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter rowWriter = new BinaryRowWriter(row);
+ BaseMapSerializer serializer = new BaseMapSerializer(
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ new ExecutionConfig());
+ rowWriter.writeMap(0, binaryMap, serializer);
+ rowWriter.complete();
+
+ BinaryMap map = (BinaryMap) row.getMap(0);
+ BinaryArray key = map.keyArray();
+ BinaryArray value = map.valueArray();
+
+ assertEquals(binaryMap, map);
+ assertEquals(array1, key);
+ assertEquals(array2, value);
+
+ assertEquals(5, key.getInt(1));
+ assertEquals(BinaryString.fromString("5"), value.getString(1));
+ assertEquals(0, key.getInt(3));
+ assertTrue(value.isNullAt(3));
+ }
+
+ @Test
+ public void testGenericMap() {
+ Map javaMap = new HashMap();
+ javaMap.put(6, BinaryString.fromString("6"));
+ javaMap.put(5, BinaryString.fromString("5"));
+ javaMap.put(666, BinaryString.fromString("666"));
+ javaMap.put(0, null);
+
+ GenericMap genericMap = new GenericMap(javaMap);
+
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter rowWriter = new BinaryRowWriter(row);
+ BaseMapSerializer serializer = new BaseMapSerializer(
+ DataTypes.INT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ new ExecutionConfig());
+ rowWriter.writeMap(0, genericMap, serializer);
+ rowWriter.complete();
+
+ Map map = row.getMap(0).toJavaMap(DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType());
+ assertEquals(BinaryString.fromString("6"), map.get(6));
+ assertEquals(BinaryString.fromString("5"), map.get(5));
+ assertEquals(BinaryString.fromString("666"), map.get(666));
+ assertTrue(map.containsKey(0));
+ assertNull(map.get(0));
+ }
+
+ @Test
+ public void testGenericObject() throws Exception {
+
+ GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
+ TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+
+ BinaryRow row = new BinaryRow(4);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeInt(0, 0);
+
+ BinaryGeneric<MyObj> myObj1 = new BinaryGeneric<>(new MyObj(0, 1), genericSerializer);
+ writer.writeGeneric(1, myObj1);
+ BinaryGeneric<MyObj> myObj2 = new BinaryGeneric<>(new MyObj(123, 5.0), genericSerializer);
+ myObj2.ensureMaterialized();
+ writer.writeGeneric(2, myObj2);
+ BinaryGeneric<MyObj> myObj3 = new BinaryGeneric<>(new MyObj(1, 1), genericSerializer);
+ writer.writeGeneric(3, myObj3);
+ writer.complete();
+
+ assertTestGenericObjectRow(row, genericSerializer);
+
+ // getBytes from var-length memorySegments.
+ BinaryRowSerializer serializer = new BinaryRowSerializer(4);
+ MemorySegment[] memorySegments = new MemorySegment[3];
+ ArrayList<MemorySegment> memorySegmentList = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ memorySegments[i] = MemorySegmentFactory.wrap(new byte[64]);
+ memorySegmentList.add(memorySegments[i]);
+ }
+ RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64);
+ serializer.serializeToPages(row, out);
+
+ BinaryRow mapRow = serializer.mapFromPages(new RandomAccessInputView(memorySegmentList, 64));
+ assertTestGenericObjectRow(mapRow, genericSerializer);
+ }
+
+ private void assertTestGenericObjectRow(BinaryRow row, TypeSerializer<MyObj> serializer) {
+ assertEquals(0, row.getInt(0));
+ BinaryGeneric<MyObj> binaryGeneric1 = row.getGeneric(1);
+ BinaryGeneric<MyObj> binaryGeneric2 = row.getGeneric(2);
+ BinaryGeneric<MyObj> binaryGeneric3 = row.getGeneric(3);
+ assertEquals(new MyObj(0, 1), BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric1, serializer));
+ assertEquals(new MyObj(123, 5.0), BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric2, serializer));
+ assertEquals(new MyObj(1, 1), BinaryGeneric.getJavaObjectFromBinaryGeneric(binaryGeneric3, serializer));
+ }
+
+ @Test
+ public void testDateAndTimeAsGenericObject() {
+ BinaryRow row = new BinaryRow(7);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+
+ LocalDate localDate = LocalDate.of(2019, 7, 16);
+ LocalTime localTime = LocalTime.of(17, 31);
+ LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime);
+
+ writer.writeInt(0, 0);
+ writer.writeGeneric(1, new BinaryGeneric<>(new Date(123), SqlDateSerializer.INSTANCE));
+ writer.writeGeneric(2, new BinaryGeneric<>(new Time(456), SqlTimeSerializer.INSTANCE));
+ writer.writeGeneric(3, new BinaryGeneric<>(new Timestamp(789), SqlTimestampSerializer.INSTANCE));
+ writer.writeGeneric(4, new BinaryGeneric<>(localDate, LocalDateSerializer.INSTANCE));
+ writer.writeGeneric(5, new BinaryGeneric<>(localTime, LocalTimeSerializer.INSTANCE));
+ writer.writeGeneric(6, new BinaryGeneric<>(localDateTime, LocalDateTimeSerializer.INSTANCE));
+ writer.complete();
+
+ assertEquals(new Date(123), BinaryGeneric.getJavaObjectFromBinaryGeneric(
+ row.getGeneric(1), SqlDateSerializer.INSTANCE));
+ assertEquals(new Time(456), BinaryGeneric.getJavaObjectFromBinaryGeneric(
+ row.getGeneric(2), SqlTimeSerializer.INSTANCE));
+ assertEquals(new Timestamp(789), BinaryGeneric.getJavaObjectFromBinaryGeneric(
+ row.getGeneric(3), SqlTimestampSerializer.INSTANCE));
+ assertEquals(localDate, BinaryGeneric.getJavaObjectFromBinaryGeneric(
+ row.getGeneric(4), LocalDateSerializer.INSTANCE));
+ assertEquals(localTime, BinaryGeneric.getJavaObjectFromBinaryGeneric(
+ row.getGeneric(5), LocalTimeSerializer.INSTANCE));
+ assertEquals(localDateTime, BinaryGeneric.getJavaObjectFromBinaryGeneric(
+ row.getGeneric(6), LocalDateTimeSerializer.INSTANCE));
+ }
+
+ @Test
+ public void testSerializeVariousSize() throws IOException {
+ // in this test, we are going to start serializing from the i-th byte (i in 0...`segSize`)
+ // and the size of the row we're going to serialize is j bytes
+ // (j in `rowFixLength` to the maximum length we can write)
+
+ int segSize = 64;
+ int segTotalNumber = 3;
+
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ Random random = new Random();
+ byte[] bytes = new byte[1024];
+ random.nextBytes(bytes);
+ writer.writeBinary(0, bytes);
+ writer.complete();
+
+ MemorySegment[] memorySegments = new MemorySegment[segTotalNumber];
+ Map<MemorySegment, Integer> msIndex = new HashMap<>();
+ for (int i = 0; i < segTotalNumber; i++) {
+ memorySegments[i] = MemorySegmentFactory.wrap(new byte[segSize]);
+ msIndex.put(memorySegments[i], i);
+ }
+
+ BinaryRowSerializer serializer = new BinaryRowSerializer(1);
+
+ int rowSizeInt = 4;
+ // note that as there is only one field in the row, the fixed-length part is 16 bytes (header + 1 field)
+ int rowFixLength = 16;
+ for (int i = 0; i < segSize; i++) {
+ // this is the maximum row size we can serialize
+ // if we are going to serialize from the i-th byte of the input view
+ int maxRowSize = (segSize * segTotalNumber) - i - rowSizeInt;
+ if (segSize - i < rowFixLength + rowSizeInt) {
+ // oops, we can't write the whole fixed-length part in the first segment
+ // because the remaining space is too small, so we have to start serializing from the second segment.
+ // when serializing, we need to first write the length of the row,
+ // then write the fixed-length part of the row.
+ maxRowSize -= segSize - i;
+ }
+ for (int j = rowFixLength; j < maxRowSize; j++) {
+ // ok, now we're going to serialize a row of j bytes
+ testSerialize(row, memorySegments, msIndex, serializer, i, j);
+ }
+ }
+ }
+
+ private void testSerialize(
+ BinaryRow row, MemorySegment[] memorySegments,
+ Map<MemorySegment, Integer> msIndex, BinaryRowSerializer serializer, int position,
+ int rowSize) throws IOException {
+ RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64);
+ out.skipBytesToWrite(position);
+ row.setTotalSize(rowSize);
+
+ // this `row` contains random bytes, and now we're going to serialize `rowSize` bytes
+ // (not including the row header) of the contents
+ serializer.serializeToPages(row, out);
+
+ // let's see how many segments we have written
+ int segNumber = msIndex.get(out.getCurrentSegment()) + 1;
+ int lastSegSize = out.getCurrentPositionInSegment();
+
+ // now deserialize from the written segments
+ ArrayList<MemorySegment> segments = new ArrayList<>(Arrays.asList(memorySegments).subList(0, segNumber));
+ RandomAccessInputView input = new RandomAccessInputView(segments, 64, lastSegSize);
+ input.skipBytesToRead(position);
+ BinaryRow mapRow = serializer.mapFromPages(input);
+
+ assertEquals(row, mapRow);
+ }
+
+ @Test
+ public void testZeroOutPaddingGeneric() {
+
+ GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
+ TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+
+ Random random = new Random();
+ byte[] bytes = new byte[1024];
+
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+
+ // let's random the bytes
+ writer.reset();
+ random.nextBytes(bytes);
+ writer.writeBinary(0, bytes);
+ writer.reset();
+ writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), genericSerializer));
+ writer.complete();
+ int hash1 = row.hashCode();
+
+ writer.reset();
+ random.nextBytes(bytes);
+ writer.writeBinary(0, bytes);
+ writer.reset();
+ writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), genericSerializer));
+ writer.complete();
+ int hash2 = row.hashCode();
+
+ assertEquals(hash1, hash2);
+ }
+
+ @Test
+ public void testZeroOutPaddingString() {
+
+ Random random = new Random();
+ byte[] bytes = new byte[1024];
+
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+
+ writer.reset();
+ random.nextBytes(bytes);
+ writer.writeBinary(0, bytes);
+ writer.reset();
+ writer.writeString(0, BinaryString.fromString("wahahah"));
+ writer.complete();
+ int hash1 = row.hashCode();
+
+ writer.reset();
+ random.nextBytes(bytes);
+ writer.writeBinary(0, bytes);
+ writer.reset();
+ writer.writeString(0, BinaryString.fromString("wahahah"));
+ writer.complete();
+ int hash2 = row.hashCode();
+
+ assertEquals(hash1, hash2);
+ }
+
+ @Test
+ public void testHashAndCopy() throws IOException {
+ MemorySegment[] segments = new MemorySegment[3];
+ for (int i = 0; i < 3; i++) {
+ segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+ }
+ RandomAccessOutputView out = new RandomAccessOutputView(segments, 64);
+ BinaryRowSerializer serializer = new BinaryRowSerializer(2);
+
+ BinaryRow row = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeString(0, BinaryString.fromString("hahahahahahahahahahahahahahahahahahahhahahahahahahahahah"));
+ writer.writeString(1, BinaryString.fromString("hahahahahahahahahahahahahahahahahahahhahahahahahahahahaa"));
+ writer.complete();
+ serializer.serializeToPages(row, out);
+
+ ArrayList<MemorySegment> segmentList = new ArrayList<>(Arrays.asList(segments));
+ RandomAccessInputView input = new RandomAccessInputView(segmentList, 64, 64);
+
+ BinaryRow mapRow = serializer.mapFromPages(input);
+ assertEquals(row, mapRow);
+ assertEquals(row.getString(0), mapRow.getString(0));
+ assertEquals(row.getString(1), mapRow.getString(1));
+ assertNotEquals(row.getString(0), mapRow.getString(1));
+
+ // test if the hash code before and after serialization are the same
+ assertEquals(row.hashCode(), mapRow.hashCode());
+ assertEquals(row.getString(0).hashCode(), mapRow.getString(0).hashCode());
+ assertEquals(row.getString(1).hashCode(), mapRow.getString(1).hashCode());
+
+ // test if the copy method produce a row with the same contents
+ assertEquals(row.copy(), mapRow.copy());
+ assertEquals(row.getString(0).copy(), mapRow.getString(0).copy());
+ assertEquals(row.getString(1).copy(), mapRow.getString(1).copy());
+ }
+
+ @Test
+ public void testSerStringToKryo() throws IOException {
+ KryoSerializer<BinaryString> serializer = new KryoSerializer<>(
+ BinaryString.class, new ExecutionConfig());
+
+ BinaryString string = BinaryString.fromString("hahahahaha");
+ RandomAccessOutputView out = new RandomAccessOutputView(
+ new MemorySegment[]{MemorySegmentFactory.wrap(new byte[1024])}, 64);
+ serializer.serialize(string, out);
+
+ RandomAccessInputView input = new RandomAccessInputView(
+ new ArrayList<>(Collections.singletonList(out.getCurrentSegment())), 64, 64);
+ BinaryString newStr = serializer.deserialize(input);
+
+ assertEquals(string, newStr);
+ }
+
+ @Test
+ public void testSerializerPages() throws IOException {
+ // Boundary tests
+ BinaryRow row24 = DataFormatTestUtil.get24BytesBinaryRow();
+ BinaryRow row160 = DataFormatTestUtil.get160BytesBinaryRow();
+ testSerializerPagesInternal(row24, row160);
+ testSerializerPagesInternal(row24, DataFormatTestUtil.getMultiSeg160BytesBinaryRow(row160));
+ }
+
+ private void testSerializerPagesInternal(BinaryRow row24, BinaryRow row160) throws IOException {
+ BinaryRowSerializer serializer = new BinaryRowSerializer(2);
+
+ // 1. test middle row with just on the edge1
+ {
+ MemorySegment[] segments = new MemorySegment[4];
+ for (int i = 0; i < segments.length; i++) {
+ segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+ }
+ RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size());
+ serializer.serializeToPages(row24, out);
+ serializer.serializeToPages(row160, out);
+ serializer.serializeToPages(row24, out);
+
+ RandomAccessInputView in = new RandomAccessInputView(
+ new ArrayList<>(Arrays.asList(segments)),
+ segments[0].size(),
+ out.getCurrentPositionInSegment());
+
+ BinaryRow retRow = new BinaryRow(2);
+ List<BinaryRow> rets = new ArrayList<>();
+ while (true) {
+ try {
+ retRow = serializer.mapFromPages(retRow, in);
+ } catch (EOFException e) {
+ break;
+ }
+ rets.add(retRow.copy());
+ }
+ assertEquals(row24, rets.get(0));
+ assertEquals(row160, rets.get(1));
+ assertEquals(row24, rets.get(2));
+ }
+
+ // 2. test middle row with just on the edge2
+ {
+ MemorySegment[] segments = new MemorySegment[7];
+ for (int i = 0; i < segments.length; i++) {
+ segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+ }
+ RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size());
+ serializer.serializeToPages(row24, out);
+ serializer.serializeToPages(row160, out);
+ serializer.serializeToPages(row160, out);
+
+ RandomAccessInputView in = new RandomAccessInputView(
+ new ArrayList<>(Arrays.asList(segments)),
+ segments[0].size(),
+ out.getCurrentPositionInSegment());
+
+ BinaryRow retRow = new BinaryRow(2);
+ List<BinaryRow> rets = new ArrayList<>();
+ while (true) {
+ try {
+ retRow = serializer.mapFromPages(retRow, in);
+ } catch (EOFException e) {
+ break;
+ }
+ rets.add(retRow.copy());
+ }
+ assertEquals(row24, rets.get(0));
+ assertEquals(row160, rets.get(1));
+ assertEquals(row160, rets.get(2));
+ }
+
+ // 3. test last row with just on the edge
+ {
+ MemorySegment[] segments = new MemorySegment[3];
+ for (int i = 0; i < segments.length; i++) {
+ segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+ }
+ RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size());
+ serializer.serializeToPages(row24, out);
+ serializer.serializeToPages(row160, out);
+
+ RandomAccessInputView in = new RandomAccessInputView(
+ new ArrayList<>(Arrays.asList(segments)),
+ segments[0].size(),
+ out.getCurrentPositionInSegment());
+
+ BinaryRow retRow = new BinaryRow(2);
+ List<BinaryRow> rets = new ArrayList<>();
+ while (true) {
+ try {
+ retRow = serializer.mapFromPages(retRow, in);
+ } catch (EOFException e) {
+ break;
+ }
+ rets.add(retRow.copy());
+ }
+ assertEquals(row24, rets.get(0));
+ assertEquals(row160, rets.get(1));
+ }
+ }
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
index d89fa5b..11d3f11 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DataFormatTestUtil.java
@@ -20,10 +20,76 @@ package org.apache.flink.table.dataformat;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.commons.lang3.RandomStringUtils;
+
+import static org.junit.Assert.assertEquals;
+
/**
* Utils for testing data formats.
*/
-class DataFormatTestUtil {
+public class DataFormatTestUtil {
+
+ /**
+ * Get a binary row of 24 bytes long.
+ */
+ public static BinaryRow get24BytesBinaryRow() {
+ // header (8 bytes) + 2 * string in fixed-length part (8 bytes each)
+ BinaryRow row = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeString(0, BinaryString.fromString(RandomStringUtils.randomNumeric(2)));
+ writer.writeString(1, BinaryString.fromString(RandomStringUtils.randomNumeric(2)));
+ writer.complete();
+ return row;
+ }
+
+ /**
+ * Get a binary row of 160 bytes long.
+ */
+ public static BinaryRow get160BytesBinaryRow() {
+ // header (8 bytes) +
+ // 72 byte length string (8 bytes in fixed-length, 72 bytes in variable-length) +
+ // 64 byte length string (8 bytes in fixed-length, 64 bytes in variable-length)
+ BinaryRow row = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeString(0, BinaryString.fromString(RandomStringUtils.randomNumeric(72)));
+ writer.writeString(1, BinaryString.fromString(RandomStringUtils.randomNumeric(64)));
+ writer.complete();
+ return row;
+ }
+
+ /**
+ * Get a binary row consisting of 6 segments.
+ * The bytes of the returned row is the same with the given input binary row.
+ */
+ public static BinaryRow getMultiSeg160BytesBinaryRow(BinaryRow row160) {
+ BinaryRow multiSegRow160 = new BinaryRow(2);
+ MemorySegment[] segments = new MemorySegment[6];
+ int baseOffset = 8;
+ int posInSeg = baseOffset;
+ int remainSize = 160;
+ for (int i = 0; i < segments.length; i++) {
+ segments[i] = MemorySegmentFactory.wrap(new byte[32]);
+ int copy = Math.min(32 - posInSeg, remainSize);
+ row160.getSegments()[0].copyTo(160 - remainSize, segments[i], posInSeg, copy);
+ remainSize -= copy;
+ posInSeg = 0;
+ }
+ multiSegRow160.pointTo(segments, baseOffset, 160);
+ assertEquals(row160, multiSegRow160);
+ return multiSegRow160;
+ }
+
+ /**
+ * Get a binary row consisting of 2 segments.
+ * Its first segment is the same with the given input binary row, while its second segment is empty.
+ */
+ public static BinaryRow getMultiSeg160BytesInOneSegRow(BinaryRow row160) {
+ MemorySegment[] segments = new MemorySegment[2];
+ segments[0] = row160.getSegments()[0];
+ segments[1] = MemorySegmentFactory.wrap(new byte[row160.getSegments()[0].size()]);
+ row160.pointTo(segments, 0, row160.getSizeInBytes());
+ return row160;
+ }
/**
* Split the given byte array into two memory segments.
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java
index e19299c..e66351a 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/DecimalTest.java
@@ -114,4 +114,12 @@ public class DecimalTest {
Assert.assertEquals(0, Decimal.zero(20, 2).toBigDecimal().intValue());
Assert.assertEquals(0, Decimal.zero(20, 2).toBigDecimal().intValue());
}
+
+ @Test
+ public void testToString() {
+ String val = "0.0000000000000000001";
+ Assert.assertEquals(val, Decimal.castFrom(val, 39, val.length() - 2).toString());
+ val = "123456789012345678901234567890123456789";
+ Assert.assertEquals(val, Decimal.castFrom(val, 39, 0).toString());
+ }
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
index c1808bc..a8355f1 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/util/SegmentsUtilTest.java
@@ -19,11 +19,20 @@ package org.apache.flink.table.util;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.dataformat.BinaryRowTest;
+import org.apache.flink.table.dataformat.DataFormatTestUtil;
+import org.apache.flink.table.dataformat.util.BinaryRowUtil;
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.flink.table.dataformat.util.BinaryRowUtil.BYTE_ARRAY_BASE_OFFSET;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
/**
* Test for {@link SegmentsUtil}, most is covered by {@link BinaryRowTest},
* this just test some boundary scenarios testing.
@@ -53,10 +62,142 @@ public class SegmentsUtilTest {
segments2[0] = MemorySegmentFactory.wrap(new byte[]{6, 0, 2, 5});
segments2[1] = MemorySegmentFactory.wrap(new byte[]{6, 12, 15, 18});
- Assert.assertTrue(SegmentsUtil.equalsMultiSegments(segments1, 0, segments2, 0, 0));
- Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 3));
- Assert.assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 6));
- Assert.assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 1, 7));
+ assertTrue(SegmentsUtil.equalsMultiSegments(segments1, 0, segments2, 0, 0));
+ assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 3));
+ assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 1, 6));
+ assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 1, 7));
+ }
+
+ @Test
+ public void testBoundaryByteArrayEquals() {
+ byte[] bytes1 = new byte[5];
+ bytes1[3] = 81;
+ byte[] bytes2 = new byte[100];
+ bytes2[3] = 81;
+ bytes2[4] = 81;
+
+ assertTrue(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 4));
+ assertFalse(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 5));
+ assertTrue(BinaryRowUtil.byteArrayEquals(bytes1, bytes2, 0));
+ }
+
+ @Test
+ public void testBoundaryEquals() {
+ BinaryRow row24 = DataFormatTestUtil.get24BytesBinaryRow();
+ BinaryRow row160 = DataFormatTestUtil.get160BytesBinaryRow();
+ BinaryRow varRow160 = DataFormatTestUtil.getMultiSeg160BytesBinaryRow(row160);
+ BinaryRow varRow160InOne = DataFormatTestUtil.getMultiSeg160BytesInOneSegRow(row160);
+
+ assertEquals(row160, varRow160InOne);
+ assertEquals(varRow160, varRow160InOne);
+ assertEquals(row160, varRow160);
+ assertEquals(varRow160InOne, varRow160);
+
+ assertNotEquals(row24, row160);
+ assertNotEquals(row24, varRow160);
+ assertNotEquals(row24, varRow160InOne);
+
+ assertTrue(SegmentsUtil.equals(row24.getSegments(), 0, row160.getSegments(), 0, 0));
+ assertTrue(SegmentsUtil.equals(row24.getSegments(), 0, varRow160.getSegments(), 0, 0));
+
+ // test var segs
+ MemorySegment[] segments1 = new MemorySegment[2];
+ segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+ MemorySegment[] segments2 = new MemorySegment[3];
+ segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
+ segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
+ segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
+
+ segments1[0].put(9, (byte) 1);
+ assertFalse(SegmentsUtil.equals(segments1, 0, segments2, 14, 14));
+ segments2[1].put(7, (byte) 1);
+ assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 14, 14));
+ assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 16, 14));
+ assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 16, 16));
+
+ segments2[2].put(7, (byte) 1);
+ assertTrue(SegmentsUtil.equals(segments1, 2, segments2, 32, 14));
+ }
+
+ @Test
+ public void testBoundaryCopy() {
+ MemorySegment[] segments1 = new MemorySegment[2];
+ segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[0].put(15, (byte) 5);
+ segments1[1].put(15, (byte) 6);
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+ SegmentsUtil.copyToBytes(segments1, 0, bytes, 0, 64);
+ assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 0, 64));
+ }
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+ SegmentsUtil.copyToBytes(segments1, 32, bytes, 0, 14);
+ assertTrue(SegmentsUtil.equals(segments1, 32, segments2, 0, 14));
+ }
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+ SegmentsUtil.copyToBytes(segments1, 34, bytes, 0, 14);
+ assertTrue(SegmentsUtil.equals(segments1, 34, segments2, 0, 14));
+ }
+ }
+
+ @Test
+ public void testCopyToUnsafe() {
+ MemorySegment[] segments1 = new MemorySegment[2];
+ segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[0].put(15, (byte) 5);
+ segments1[1].put(15, (byte) 6);
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+ SegmentsUtil.copyToUnsafe(segments1, 0, bytes, BYTE_ARRAY_BASE_OFFSET, 64);
+ assertTrue(SegmentsUtil.equals(segments1, 0, segments2, 0, 64));
+ }
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+ SegmentsUtil.copyToUnsafe(segments1, 32, bytes, BYTE_ARRAY_BASE_OFFSET, 14);
+ assertTrue(SegmentsUtil.equals(segments1, 32, segments2, 0, 14));
+ }
+
+ {
+ byte[] bytes = new byte[64];
+ MemorySegment[] segments2 = new MemorySegment[]{MemorySegmentFactory.wrap(bytes)};
+
+ SegmentsUtil.copyToUnsafe(segments1, 34, bytes, BYTE_ARRAY_BASE_OFFSET, 14);
+ assertTrue(SegmentsUtil.equals(segments1, 34, segments2, 0, 14));
+ }
+ }
+
+ @Test
+ public void testFind() {
+ MemorySegment[] segments1 = new MemorySegment[2];
+ segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
+ segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+ MemorySegment[] segments2 = new MemorySegment[3];
+ segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
+ segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
+ segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
+
+ assertEquals(34, SegmentsUtil.find(segments1, 34, 0, segments2, 0, 0));
+ assertEquals(-1, SegmentsUtil.find(segments1, 34, 0, segments2, 0, 15));
}
}