You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/11/07 10:19:09 UTC
[flink] 04/04: [FLINK-13702][table-planner-blink] Fixed
BinaryGeneric & BinaryString materialization
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 814b5fd35fd0308f6edeedb8414623a43ba512a7
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Fri Nov 1 15:08:38 2019 +0100
[FLINK-13702][table-planner-blink] Fixed BinaryGeneric & BinaryString materialization
---
.../FirstValueWithRetractAggFunction.java | 6 +-
.../LastValueWithRetractAggFunction.java | 4 +-
.../flink/table/planner/codegen/CodeGenUtils.scala | 5 +-
.../planner/codegen/SortCodeGeneratorTest.java | 16 +-
.../table/dataformat/AbstractBinaryWriter.java | 31 +--
.../apache/flink/table/dataformat/BinaryArray.java | 6 +-
.../flink/table/dataformat/BinaryFormat.java | 87 ++------
.../flink/table/dataformat/BinaryGeneric.java | 49 ++---
.../apache/flink/table/dataformat/BinaryMap.java | 12 +-
.../apache/flink/table/dataformat/BinaryRow.java | 8 +-
.../flink/table/dataformat/BinarySection.java | 74 +++++++
.../flink/table/dataformat/BinaryString.java | 239 ++++++++++++---------
.../flink/table/dataformat/BinaryStringUtil.java | 119 +++++-----
.../flink/table/dataformat/BinaryWriter.java | 5 +-
.../table/dataformat/DataFormatConverters.java | 2 +-
.../flink/table/dataformat/LazyBinaryFormat.java | 73 ++++---
.../apache/flink/table/dataformat/NestedRow.java | 5 +-
.../runtime/typeutils/BinaryGenericSerializer.java | 76 +++----
.../apache/flink/table/dataformat/BaseRowTest.java | 11 +-
.../flink/table/dataformat/BinaryArrayTest.java | 10 +-
.../flink/table/dataformat/BinaryRowTest.java | 63 ++++--
.../flink/table/dataformat/BinaryStringTest.java | 17 +-
.../flink/table/dataformat/NestedRowTest.java | 10 +-
.../runtime/typeutils/BaseArraySerializerTest.java | 2 +-
.../runtime/typeutils/BaseMapSerializerTest.java | 2 +-
.../typeutils/BinaryGenericSerializerTest.java | 21 +-
.../flink/table/utils/BinaryGenericAsserter.java | 78 +++++++
27 files changed, 607 insertions(+), 424 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueWithRetractAggFunction.java
index 7bcbbc2..a81d1d8 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/FirstValueWithRetractAggFunction.java
@@ -70,11 +70,9 @@ public abstract class FirstValueWithRetractAggFunction<T> extends AggregateFunct
acc.setField(0, null);
acc.setField(1, null);
acc.setField(2, new BinaryGeneric<>(
- new MapView<>(getResultType(), new ListTypeInfo<>(Types.LONG)),
- getValueToOrderMapViewSerializer()));
+ new MapView<>(getResultType(), new ListTypeInfo<>(Types.LONG))));
acc.setField(3, new BinaryGeneric<>(
- new MapView<>(Types.LONG, new ListTypeInfo<>(getResultType())),
- getOrderToValueMapViewSerializer()));
+ new MapView<>(Types.LONG, new ListTypeInfo<>(getResultType()))));
return acc;
}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LastValueWithRetractAggFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LastValueWithRetractAggFunction.java
index 97f3c99..cd250a4 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LastValueWithRetractAggFunction.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/LastValueWithRetractAggFunction.java
@@ -70,9 +70,9 @@ public abstract class LastValueWithRetractAggFunction<T> extends AggregateFuncti
acc.setField(0, null);
acc.setField(1, null);
acc.setField(2, new BinaryGeneric<>(
- new MapView<>(getResultType(), new ListTypeInfo<>(Types.LONG)), getValueToOrderMapViewSerializer()));
+ new MapView<>(getResultType(), new ListTypeInfo<>(Types.LONG))));
acc.setField(3, new BinaryGeneric<>(
- new MapView<>(Types.LONG, new ListTypeInfo<>(getResultType())), getOrderToValueMapViewSerializer()));
+ new MapView<>(Types.LONG, new ListTypeInfo<>(getResultType()))));
return acc;
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index da859ef..560c6a8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -653,8 +653,9 @@ object CodeGenUtils {
case ROW =>
val ser = ctx.addReusableTypeSerializer(t)
s"$writerTerm.writeRow($indexTerm, $fieldValTerm, $ser)"
-
- case ANY => s"$writerTerm.writeGeneric($indexTerm, $fieldValTerm)"
+ case ANY =>
+ val ser = ctx.addReusableTypeSerializer(t)
+ s"$writerTerm.writeGeneric($indexTerm, $fieldValTerm, $ser)"
}
private def isConverterIdentity(t: DataType): Boolean = {
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
index c055078..5e1aeaf 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.runtime.types.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.AbstractRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
@@ -82,6 +83,8 @@ import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
+import static org.apache.flink.table.utils.BinaryGenericAsserter.equivalent;
+import static org.junit.Assert.assertThat;
/**
* Random test for sort code generator.
@@ -262,7 +265,7 @@ public class SortCodeGeneratorTest {
}
break;
case ANY:
- seeds[i] = new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE);
+ seeds[i] = new BinaryGeneric<>(rnd.nextInt());
break;
default:
throw new RuntimeException("Not support!");
@@ -314,7 +317,7 @@ public class SortCodeGeneratorTest {
case ROW:
return GenericRow.of(new Object[]{null});
case ANY:
- return new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE);
+ return new BinaryGeneric<>(rnd.nextInt());
default:
throw new RuntimeException("Not support!");
}
@@ -355,7 +358,7 @@ public class SortCodeGeneratorTest {
return GenericRow.of(GenericRow.of(new Object[]{null}));
}
case ANY:
- return new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE);
+ return new BinaryGeneric<>(rnd.nextInt());
default:
throw new RuntimeException("Not support!");
}
@@ -396,7 +399,7 @@ public class SortCodeGeneratorTest {
return GenericRow.of(GenericRow.of(rnd.nextInt()));
}
case ANY:
- return new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE);
+ return new BinaryGeneric<>(rnd.nextInt());
default:
throw new RuntimeException("Not support!");
}
@@ -562,6 +565,11 @@ public class SortCodeGeneratorTest {
Object o2 = TypeGetterSetters.get(result.get(i), keys[j], keyTypes[j]);
if (keyTypes[j] instanceof VarBinaryType) {
Assert.assertArrayEquals(msg, (byte[]) o1, (byte[]) o2);
+ } else if (keyTypes[j] instanceof TypeInformationAnyType) {
+ assertThat(
+ msg,
+ (BinaryGeneric) o1,
+ equivalent((BinaryGeneric) o2, new BinaryGenericSerializer<>(IntSerializer.INSTANCE)));
} else {
Assert.assertEquals(msg, o1, o2);
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
index 5091c3c..6499528 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java
@@ -17,12 +17,14 @@
package org.apache.flink.table.dataformat;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
import org.apache.flink.table.runtime.typeutils.BaseMapSerializer;
import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
import org.apache.flink.table.runtime.util.SegmentsUtil;
import java.io.IOException;
@@ -30,8 +32,6 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
-import static org.apache.flink.table.dataformat.BinaryFormat.MAX_FIX_PART_DATA_SIZE;
-
/**
* Use the special format to write data to a {@link MemorySegment} (its capacity grows
* automatically).
@@ -91,7 +91,7 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
private void writeBytes(int pos, byte[] bytes) {
int len = bytes.length;
- if (len <= MAX_FIX_PART_DATA_SIZE) {
+ if (len <= BinaryFormat.MAX_FIX_PART_DATA_SIZE) {
writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len);
} else {
writeBytesToVarLenPart(pos, bytes, len);
@@ -118,24 +118,11 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
}
@Override
- public void writeGeneric(int pos, BinaryGeneric input) {
- if (input.getSegments() == null) {
- int beforeCursor = cursor;
- try {
- input.getJavaObjectSerializer().serialize(input.getJavaObject(), getOutputView());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- int size = cursor - beforeCursor;
- final int roundedSize = roundNumberOfBytesToNearestWord(size);
- int paddingBytes = roundedSize - size;
- ensureCapacity(paddingBytes);
- setOffsetAndSize(pos, beforeCursor, size);
- zeroBytes(cursor, paddingBytes);
- cursor += paddingBytes;
- } else {
- writeSegmentsToVarLenPart(pos, input.getSegments(), input.getOffset(), input.getSizeInBytes());
- }
+ @SuppressWarnings("unchecked")
+ public void writeGeneric(int pos, BinaryGeneric input, BinaryGenericSerializer serializer) {
+ TypeSerializer innerSerializer = serializer.getInnerSerializer();
+ input.ensureMaterialized(innerSerializer);
+ writeSegmentsToVarLenPart(pos, input.getSegments(), input.getOffset(), input.getSizeInBytes());
}
@Override
@@ -152,7 +139,7 @@ public abstract class AbstractBinaryWriter implements BinaryWriter {
@Override
public void writeBinary(int pos, byte[] bytes) {
int len = bytes.length;
- if (len <= MAX_FIX_PART_DATA_SIZE) {
+ if (len <= BinaryFormat.MAX_FIX_PART_DATA_SIZE) {
writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len);
} else {
writeBytesToVarLenPart(pos, bytes, len);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
index acf5adc..523d96a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java
@@ -35,7 +35,7 @@ import static org.apache.flink.core.memory.MemoryUtils.UNSAFE;
*
* <p>{@code BinaryArray} are influenced by Apache Spark UnsafeArrayData.
*/
-public final class BinaryArray extends BinaryFormat implements BaseArray {
+public final class BinaryArray extends BinarySection implements BaseArray {
/**
* Offset for Arrays.
@@ -174,7 +174,7 @@ public final class BinaryArray extends BinaryFormat implements BaseArray {
assertIndexIsValid(pos);
int fieldOffset = getElementOffset(pos, 8);
final long offsetAndSize = SegmentsUtil.getLong(segments, fieldOffset);
- return BinaryString.readBinaryStringFieldFromSegments(
+ return BinaryFormat.readBinaryStringFieldFromSegments(
segments, offset, fieldOffset, offsetAndSize);
}
@@ -205,7 +205,7 @@ public final class BinaryArray extends BinaryFormat implements BaseArray {
assertIndexIsValid(pos);
int fieldOffset = getElementOffset(pos, 8);
final long offsetAndSize = SegmentsUtil.getLong(segments, fieldOffset);
- return readBinaryFieldFromSegments(
+ return BinaryFormat.readBinaryFieldFromSegments(
segments, offset, fieldOffset, offsetAndSize);
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
index 2a6ccc4..a95ac03 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,10 +22,9 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.table.runtime.util.SegmentsUtil;
/**
- * Binary format that in {@link MemorySegment}s.
+ * Binary format spanning {@link MemorySegment}s.
*/
-public abstract class BinaryFormat {
-
+public interface BinaryFormat {
/**
* It decides whether to put data in FixLenPart or VarLenPart. See more in {@link BinaryRow}.
*
@@ -36,8 +36,7 @@ public abstract class BinaryFormat {
* 1-bit mark(1) = 0, 31-bits offset to the data, and 4-bytes length of data.
* Data is stored in variable-length part.
*/
- static final int MAX_FIX_PART_DATA_SIZE = 7;
-
+ int MAX_FIX_PART_DATA_SIZE = 7;
/**
* To get the mark in highest bit of long.
* Form: 10000000 00000000 ... (8 bytes)
@@ -45,8 +44,7 @@ public abstract class BinaryFormat {
* <p>This is used to decide whether the data is stored in fixed-length part or variable-length
* part. see {@link #MAX_FIX_PART_DATA_SIZE} for more information.
*/
- private static final long HIGHEST_FIRST_BIT = 0x80L << 56;
-
+ long HIGHEST_FIRST_BIT = 0x80L << 56;
/**
* To get the 7 bits length in second bit to eighth bit out of a long.
* Form: 01111111 00000000 ... (8 bytes)
@@ -54,58 +52,13 @@ public abstract class BinaryFormat {
* <p>This is used to get the length of the data which is stored in this long.
* see {@link #MAX_FIX_PART_DATA_SIZE} for more information.
*/
- private static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
-
- protected MemorySegment[] segments;
- protected int offset;
- protected int sizeInBytes;
-
- public BinaryFormat() {}
-
- public BinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes) {
- this.segments = segments;
- this.offset = offset;
- this.sizeInBytes = sizeInBytes;
- }
-
- public final void pointTo(MemorySegment segment, int offset, int sizeInBytes) {
- pointTo(new MemorySegment[] {segment}, offset, sizeInBytes);
- }
-
- public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
- this.segments = segments;
- this.offset = offset;
- this.sizeInBytes = sizeInBytes;
- }
-
- public MemorySegment[] getSegments() {
- return segments;
- }
-
- public int getOffset() {
- return offset;
- }
+ long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56;
- public int getSizeInBytes() {
- return sizeInBytes;
- }
+ MemorySegment[] getSegments();
- @Override
- public boolean equals(Object o) {
- return this == o || o != null &&
- getClass() == o.getClass() &&
- binaryEquals((BinaryFormat) o);
- }
+ int getOffset();
- protected boolean binaryEquals(BinaryFormat that) {
- return sizeInBytes == that.sizeInBytes &&
- SegmentsUtil.equals(segments, offset, that.segments, that.offset, sizeInBytes);
- }
-
- @Override
- public int hashCode() {
- return SegmentsUtil.hash(segments, offset, sizeInBytes);
- }
+ int getSizeInBytes();
/**
* Get binary, if len less than 8, will be include in variablePartOffsetAndLen.
@@ -117,7 +70,9 @@ public abstract class BinaryFormat {
* @param variablePartOffsetAndLen a long value, real data or offset and len.
*/
static byte[] readBinaryFieldFromSegments(
- MemorySegment[] segments, int baseOffset, int fieldOffset,
+ MemorySegment[] segments,
+ int baseOffset,
+ int fieldOffset,
long variablePartOffsetAndLen) {
long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
if (mark == 0) {
@@ -145,7 +100,9 @@ public abstract class BinaryFormat {
* @param variablePartOffsetAndLen a long value, real data or offset and len.
*/
static BinaryString readBinaryStringFieldFromSegments(
- MemorySegment[] segments, int baseOffset, int fieldOffset,
+ MemorySegment[] segments,
+ int baseOffset,
+ int fieldOffset,
long variablePartOffsetAndLen) {
long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT;
if (mark == 0) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
index 390e43b..5200f36 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java
@@ -31,48 +31,28 @@ import java.io.IOException;
*/
public final class BinaryGeneric<T> extends LazyBinaryFormat<T> {
- private final TypeSerializer<T> javaObjectSer;
-
- public BinaryGeneric(T javaObject, TypeSerializer<T> javaObjectSer) {
+ public BinaryGeneric(T javaObject) {
super(javaObject);
- this.javaObjectSer = javaObjectSer;
}
- public BinaryGeneric(MemorySegment[] segments, int offset, int sizeInBytes,
- TypeSerializer<T> javaObjectSer) {
+ public BinaryGeneric(MemorySegment[] segments, int offset, int sizeInBytes) {
super(segments, offset, sizeInBytes);
- this.javaObjectSer = javaObjectSer;
}
- public BinaryGeneric(MemorySegment[] segments, int offset, int sizeInBytes, T javaObject,
- TypeSerializer<T> javaObjectSer) {
+ public BinaryGeneric(MemorySegment[] segments, int offset, int sizeInBytes, T javaObject) {
super(segments, offset, sizeInBytes, javaObject);
- this.javaObjectSer = javaObjectSer;
- }
-
- public TypeSerializer<T> getJavaObjectSerializer() {
- return javaObjectSer;
}
@Override
- public void materialize() {
+ protected BinarySection materialize(TypeSerializer<T> serializer) {
try {
- byte[] bytes = InstantiationUtil.serializeToByteArray(javaObjectSer, javaObject);
- pointTo(new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
+ byte[] bytes = InstantiationUtil.serializeToByteArray(serializer, javaObject);
+ return new BinarySection(new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
- public BinaryGeneric<T> copy() {
- ensureMaterialized();
- byte[] bytes = SegmentsUtil.copyToBytes(segments, offset, sizeInBytes);
- T newJavaObject = javaObject == null ? null : javaObjectSer.copy(javaObject);
- return new BinaryGeneric<>(new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, sizeInBytes,
- newJavaObject,
- javaObjectSer);
- }
-
static <T> BinaryGeneric<T> readBinaryGenericFieldFromSegments(
MemorySegment[] segments, int baseOffset, long offsetAndSize) {
final int size = ((int) offsetAndSize);
@@ -91,4 +71,21 @@ public final class BinaryGeneric<T> extends LazyBinaryFormat<T> {
}
return value.getJavaObject();
}
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException("BinaryGeneric cannot be compared");
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException("BinaryGeneric does not have a hashCode");
+ }
+
+ @Override
+ public String toString() {
+ return "BinaryGeneric{" +
+ "javaObject=" + javaObject +
+ '}';
+ }
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
index 8660fee..679cff9 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryMap.java
@@ -34,7 +34,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
*
* <p>{@code BinaryMap} are influenced by Apache Spark UnsafeMapData.
*/
-public final class BinaryMap extends BinaryFormat implements BaseMap {
+public final class BinaryMap extends BinarySection implements BaseMap {
private final BinaryArray keys;
private final BinaryArray values;
@@ -102,13 +102,13 @@ public final class BinaryMap extends BinaryFormat implements BaseMap {
}
public static BinaryMap valueOf(BinaryArray key, BinaryArray value) {
- checkArgument(key.getSegments().length == 1 && value.getSegments().length == 1);
- byte[] bytes = new byte[4 + key.getSizeInBytes() + value.getSizeInBytes()];
+ checkArgument(key.segments.length == 1 && value.getSegments().length == 1);
+ byte[] bytes = new byte[4 + key.sizeInBytes + value.sizeInBytes];
MemorySegment segment = MemorySegmentFactory.wrap(bytes);
- segment.putInt(0, key.getSizeInBytes());
- key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.getSizeInBytes());
+ segment.putInt(0, key.sizeInBytes);
+ key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.sizeInBytes);
value.getSegments()[0].copyTo(
- value.getOffset(), segment, 4 + key.getSizeInBytes(), value.getSizeInBytes());
+ value.getOffset(), segment, 4 + key.sizeInBytes, value.sizeInBytes);
BinaryMap map = new BinaryMap();
map.pointTo(segment, 0, bytes.length);
return map;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
index bc98c43..75f7e24 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
@@ -52,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* The difference is that BinaryRow is placed on a discontinuous memory, and the variable length
* type can also be placed on a fixed length area (If it's short enough).
*/
-public final class BinaryRow extends BinaryFormat implements BaseRow {
+public final class BinaryRow extends BinarySection implements BaseRow {
public static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
private static final long FIRST_BYTE_ZERO = LITTLE_ENDIAN ? ~0xFFL : ~(0xFFL << 56L);
@@ -287,7 +287,7 @@ public final class BinaryRow extends BinaryFormat implements BaseRow {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndLen = segments[0].getLong(fieldOffset);
- return BinaryString.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
+ return BinaryFormat.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
}
@Override
@@ -315,7 +315,7 @@ public final class BinaryRow extends BinaryFormat implements BaseRow {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndLen = segments[0].getLong(fieldOffset);
- return readBinaryFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
+ return BinaryFormat.readBinaryFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
}
@Override
@@ -411,7 +411,7 @@ public final class BinaryRow extends BinaryFormat implements BaseRow {
}
private boolean equalsFrom(Object o, int startIndex) {
- if (o != null && o instanceof BinaryRow) {
+ if (o instanceof BinaryRow) {
BinaryRow other = (BinaryRow) o;
return sizeInBytes == other.sizeInBytes &&
SegmentsUtil.equals(
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinarySection.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinarySection.java
new file mode 100644
index 0000000..afde68b
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinarySection.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.runtime.util.SegmentsUtil;
+
+/**
+ * Binary section of memory.
+ */
+public class BinarySection implements BinaryFormat {
+
+ protected MemorySegment[] segments;
+ protected int offset;
+ protected int sizeInBytes;
+
+ public BinarySection() {}
+
+ public BinarySection(MemorySegment[] segments, int offset, int sizeInBytes) {
+ this.segments = segments;
+ this.offset = offset;
+ this.sizeInBytes = sizeInBytes;
+ }
+
+ public final void pointTo(MemorySegment segment, int offset, int sizeInBytes) {
+ pointTo(new MemorySegment[] {segment}, offset, sizeInBytes);
+ }
+
+ public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
+ this.segments = segments;
+ this.offset = offset;
+ this.sizeInBytes = sizeInBytes;
+ }
+
+ public MemorySegment[] getSegments() {
+ return segments;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public int getSizeInBytes() {
+ return sizeInBytes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return this == o || o != null &&
+ getClass() == o.getClass() &&
+ sizeInBytes == ((BinarySection) o).sizeInBytes &&
+ SegmentsUtil.equals(segments, offset, ((BinarySection) o).segments, ((BinarySection) o).offset, sizeInBytes);
+ }
+
+ @Override
+ public int hashCode() {
+ return SegmentsUtil.hash(segments, offset, sizeInBytes);
+ }
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
index f3db1eb..78240a6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.dataformat;
import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.table.runtime.typeutils.BinaryStringTypeInfoFactory;
@@ -116,7 +117,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
ensureMaterialized();
if (inFirstSegment()) {
int len = 0;
- for (int i = 0; i < sizeInBytes; i += numBytesForFirstByte(getByteOneSegment(i))) {
+ for (int i = 0; i < binarySection.sizeInBytes; i += numBytesForFirstByte(getByteOneSegment(i))) {
len++;
}
return len;
@@ -127,10 +128,10 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
private int numCharsMultiSegs() {
int len = 0;
- int segSize = segments[0].size();
+ int segSize = binarySection.segments[0].size();
SegmentAndOffset index = firstSegmentAndOffset(segSize);
int i = 0;
- while (i < sizeInBytes) {
+ while (i < binarySection.sizeInBytes) {
int charBytes = numBytesForFirstByte(index.value());
i += charBytes;
len++;
@@ -141,7 +142,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
/**
* Returns the {@code byte} value at the specified index. An index ranges from {@code 0} to
- * {@code getSizeInBytes() - 1}.
+ * {@code binarySection.sizeInBytes - 1}.
*
* @param index the index of the {@code byte} value.
* @return the {@code byte} value at the specified index of this UTF-8 bytes.
@@ -151,12 +152,12 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
*/
public byte byteAt(int index) {
ensureMaterialized();
- int globalOffset = offset + index;
- int size = segments[0].size();
+ int globalOffset = binarySection.offset + index;
+ int size = binarySection.segments[0].size();
if (globalOffset < size) {
- return segments[0].get(globalOffset);
+ return binarySection.segments[0].get(globalOffset);
} else {
- return segments[globalOffset / size].get(globalOffset % size);
+ return binarySection.segments[globalOffset / size].get(globalOffset % size);
}
}
@@ -165,12 +166,12 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
*/
public byte[] getBytes() {
ensureMaterialized();
- return SegmentsUtil.getBytes(segments, offset, sizeInBytes);
+ return SegmentsUtil.getBytes(binarySection.segments, binarySection.offset, binarySection.sizeInBytes);
}
@Override
public boolean equals(Object o) {
- if (o != null && o instanceof BinaryString) {
+ if (o instanceof BinaryString) {
BinaryString other = (BinaryString) o;
if (javaObject != null && other.javaObject != null) {
return javaObject.equals(other.javaObject);
@@ -178,28 +179,62 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
ensureMaterialized();
other.ensureMaterialized();
- return binaryEquals(other);
+ return binarySection.equals(other.binarySection);
} else {
return false;
}
}
@Override
+ public int hashCode() {
+ ensureMaterialized();
+ return binarySection.hashCode();
+ }
+
+ @Override
public String toString() {
if (javaObject == null) {
- byte[] bytes = SegmentsUtil.allocateReuseBytes(sizeInBytes);
- SegmentsUtil.copyToBytes(segments, offset, bytes, 0, sizeInBytes);
- javaObject = StringUtf8Utils.decodeUTF8(bytes, 0, sizeInBytes);
+ byte[] bytes = SegmentsUtil.allocateReuseBytes(binarySection.sizeInBytes);
+ SegmentsUtil.copyToBytes(binarySection.segments, binarySection.offset, bytes, 0, binarySection.sizeInBytes);
+ javaObject = StringUtf8Utils.decodeUTF8(bytes, 0, binarySection.sizeInBytes);
}
return javaObject;
}
@Override
- public void materialize() {
+ public MemorySegment[] getSegments() {
+ ensureMaterialized();
+ return super.getSegments();
+ }
+
+ @Override
+ public int getOffset() {
+ ensureMaterialized();
+ return super.getOffset();
+ }
+
+ @Override
+ public int getSizeInBytes() {
+ ensureMaterialized();
+ return super.getSizeInBytes();
+ }
+
+ public void ensureMaterialized() {
+ ensureMaterialized(null);
+ }
+
+ @Override
+ protected BinarySection materialize(TypeSerializer<String> serializer) {
+ if (serializer != null) {
+ throw new IllegalArgumentException("BinaryString does not support custom serializers");
+ }
+
byte[] bytes = StringUtf8Utils.encodeUTF8(javaObject);
- segments = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
- offset = 0;
- sizeInBytes = bytes.length;
+ return new BinarySection(
+ new MemorySegment[]{MemorySegmentFactory.wrap(bytes)},
+ 0,
+ bytes.length
+ );
}
/**
@@ -207,9 +242,9 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
*/
public BinaryString copy() {
ensureMaterialized();
- byte[] copy = SegmentsUtil.copyToBytes(segments, offset, sizeInBytes);
+ byte[] copy = SegmentsUtil.copyToBytes(binarySection.segments, binarySection.offset, binarySection.sizeInBytes);
return new BinaryString(new MemorySegment[] {MemorySegmentFactory.wrap(copy)},
- 0, sizeInBytes, javaObject);
+ 0, binarySection.sizeInBytes, javaObject);
}
/**
@@ -227,19 +262,20 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
ensureMaterialized();
other.ensureMaterialized();
- if (segments.length == 1 && other.segments.length == 1) {
+ if (binarySection.segments.length == 1 && other.binarySection.segments.length == 1) {
- int len = Math.min(sizeInBytes, other.sizeInBytes);
- MemorySegment seg1 = segments[0];
- MemorySegment seg2 = other.segments[0];
+ int len = Math.min(binarySection.sizeInBytes, other.binarySection.sizeInBytes);
+ MemorySegment seg1 = binarySection.segments[0];
+ MemorySegment seg2 = other.binarySection.segments[0];
for (int i = 0; i < len; i++) {
- int res = (seg1.get(offset + i) & 0xFF) - (seg2.get(other.offset + i) & 0xFF);
+ int res =
+ (seg1.get(binarySection.offset + i) & 0xFF) - (seg2.get(other.binarySection.offset + i) & 0xFF);
if (res != 0) {
return res;
}
}
- return sizeInBytes - other.sizeInBytes;
+ return binarySection.sizeInBytes - other.binarySection.sizeInBytes;
}
// if there are multi segments.
@@ -251,20 +287,20 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
*/
private int compareMultiSegments(BinaryString other) {
- if (sizeInBytes == 0 || other.sizeInBytes == 0) {
- return sizeInBytes - other.sizeInBytes;
+ if (binarySection.sizeInBytes == 0 || other.binarySection.sizeInBytes == 0) {
+ return binarySection.sizeInBytes - other.binarySection.sizeInBytes;
}
- int len = Math.min(sizeInBytes, other.sizeInBytes);
+ int len = Math.min(binarySection.sizeInBytes, other.binarySection.sizeInBytes);
- MemorySegment seg1 = segments[0];
- MemorySegment seg2 = other.segments[0];
+ MemorySegment seg1 = binarySection.segments[0];
+ MemorySegment seg2 = other.binarySection.segments[0];
- int segmentSize = segments[0].size();
- int otherSegmentSize = other.segments[0].size();
+ int segmentSize = binarySection.segments[0].size();
+ int otherSegmentSize = other.binarySection.segments[0].size();
- int sizeOfFirst1 = segmentSize - offset;
- int sizeOfFirst2 = otherSegmentSize - other.offset;
+ int sizeOfFirst1 = segmentSize - binarySection.offset;
+ int sizeOfFirst2 = otherSegmentSize - other.binarySection.offset;
int varSegIndex1 = 1;
int varSegIndex2 = 1;
@@ -272,12 +308,12 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
// find the first segment of this string.
while (sizeOfFirst1 <= 0) {
sizeOfFirst1 += segmentSize;
- seg1 = segments[varSegIndex1++];
+ seg1 = binarySection.segments[varSegIndex1++];
}
while (sizeOfFirst2 <= 0) {
sizeOfFirst2 += otherSegmentSize;
- seg2 = other.segments[varSegIndex2++];
+ seg2 = other.binarySection.segments[varSegIndex2++];
}
int offset1 = segmentSize - sizeOfFirst1;
@@ -299,20 +335,20 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
len -= needCompare;
// next segment
if (sizeOfFirst1 < sizeOfFirst2) { //I am smaller
- seg1 = segments[varSegIndex1++];
+ seg1 = binarySection.segments[varSegIndex1++];
offset1 = 0;
offset2 += needCompare;
sizeOfFirst1 = segmentSize;
sizeOfFirst2 -= needCompare;
} else if (sizeOfFirst1 > sizeOfFirst2) { //other is smaller
- seg2 = other.segments[varSegIndex2++];
+ seg2 = other.binarySection.segments[varSegIndex2++];
offset2 = 0;
offset1 += needCompare;
sizeOfFirst2 = otherSegmentSize;
sizeOfFirst1 -= needCompare;
} else { // same, should go ahead both.
- seg1 = segments[varSegIndex1++];
- seg2 = other.segments[varSegIndex2++];
+ seg1 = binarySection.segments[varSegIndex1++];
+ seg2 = other.binarySection.segments[varSegIndex2++];
offset1 = 0;
offset2 = 0;
sizeOfFirst1 = segmentSize;
@@ -323,7 +359,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
checkArgument(needCompare == len);
- return sizeInBytes - other.sizeInBytes;
+ return binarySection.sizeInBytes - other.binarySection.sizeInBytes;
}
/**
@@ -343,27 +379,27 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
*/
public BinaryString substring(int beginIndex, int endIndex) {
ensureMaterialized();
- if (endIndex <= beginIndex || beginIndex >= sizeInBytes) {
+ if (endIndex <= beginIndex || beginIndex >= binarySection.sizeInBytes) {
return EMPTY_UTF8;
}
if (inFirstSegment()) {
- MemorySegment segment = segments[0];
+ MemorySegment segment = binarySection.segments[0];
int i = 0;
int c = 0;
- while (i < sizeInBytes && c < beginIndex) {
- i += numBytesForFirstByte(segment.get(i + offset));
+ while (i < binarySection.sizeInBytes && c < beginIndex) {
+ i += numBytesForFirstByte(segment.get(i + binarySection.offset));
c += 1;
}
int j = i;
- while (i < sizeInBytes && c < endIndex) {
- i += numBytesForFirstByte(segment.get(i + offset));
+ while (i < binarySection.sizeInBytes && c < endIndex) {
+ i += numBytesForFirstByte(segment.get(i + binarySection.offset));
c += 1;
}
if (i > j) {
byte[] bytes = new byte[i - j];
- segment.get(offset + j, bytes, 0, i - j);
+ segment.get(binarySection.offset + j, bytes, 0, i - j);
return fromBytes(bytes);
} else {
return EMPTY_UTF8;
@@ -374,11 +410,11 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
}
private BinaryString substringMultiSegs(final int start, final int until) {
- int segSize = segments[0].size();
+ int segSize = binarySection.segments[0].size();
SegmentAndOffset index = firstSegmentAndOffset(segSize);
int i = 0;
int c = 0;
- while (i < sizeInBytes && c < start) {
+ while (i < binarySection.sizeInBytes && c < start) {
int charSize = numBytesForFirstByte(index.value());
i += charSize;
index.skipBytes(charSize, segSize);
@@ -386,7 +422,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
}
int j = i;
- while (i < sizeInBytes && c < until) {
+ while (i < binarySection.sizeInBytes && c < until) {
int charSize = numBytesForFirstByte(index.value());
i += charSize;
index.skipBytes(charSize, segSize);
@@ -394,7 +430,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
}
if (i > j) {
- return fromBytes(SegmentsUtil.copyToBytes(segments, offset + j, i - j));
+ return fromBytes(SegmentsUtil.copyToBytes(binarySection.segments, binarySection.offset + j, i - j));
} else {
return EMPTY_UTF8;
}
@@ -410,12 +446,12 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
public boolean contains(final BinaryString s) {
ensureMaterialized();
s.ensureMaterialized();
- if (s.sizeInBytes == 0) {
+ if (s.binarySection.sizeInBytes == 0) {
return true;
}
int find = SegmentsUtil.find(
- segments, offset, sizeInBytes,
- s.segments, s.offset, s.sizeInBytes);
+ binarySection.segments, binarySection.offset, binarySection.sizeInBytes,
+ s.binarySection.segments, s.binarySection.offset, s.binarySection.sizeInBytes);
return find != -1;
}
@@ -446,7 +482,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
public boolean endsWith(final BinaryString suffix) {
ensureMaterialized();
suffix.ensureMaterialized();
- return matchAt(suffix, sizeInBytes - suffix.sizeInBytes);
+ return matchAt(suffix, binarySection.sizeInBytes - suffix.binarySection.sizeInBytes);
}
/**
@@ -461,9 +497,9 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
ensureMaterialized();
if (inFirstSegment()) {
int s = 0;
- int e = this.sizeInBytes - 1;
+ int e = this.binarySection.sizeInBytes - 1;
// skip all of the space (0x20) in the left side
- while (s < this.sizeInBytes && getByteOneSegment(s) == 0x20) {
+ while (s < this.binarySection.sizeInBytes && getByteOneSegment(s) == 0x20) {
s++;
}
// skip all of the space (0x20) in the right side
@@ -483,11 +519,11 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
private BinaryString trimMultiSegs() {
int s = 0;
- int e = this.sizeInBytes - 1;
- int segSize = segments[0].size();
+ int e = this.binarySection.sizeInBytes - 1;
+ int segSize = binarySection.segments[0].size();
SegmentAndOffset front = firstSegmentAndOffset(segSize);
// skip all of the space (0x20) in the left side
- while (s < this.sizeInBytes && front.value() == 0x20) {
+ while (s < this.binarySection.sizeInBytes && front.value() == 0x20) {
s++;
front.nextByte(segSize);
}
@@ -518,7 +554,7 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
public int indexOf(BinaryString str, int fromIndex) {
ensureMaterialized();
str.ensureMaterialized();
- if (str.sizeInBytes == 0) {
+ if (str.binarySection.sizeInBytes == 0) {
return 0;
}
if (inFirstSegment()) {
@@ -526,21 +562,21 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
int byteIdx = 0;
// position is char
int charIdx = 0;
- while (byteIdx < sizeInBytes && charIdx < fromIndex) {
+ while (byteIdx < binarySection.sizeInBytes && charIdx < fromIndex) {
byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx));
charIdx++;
}
do {
- if (byteIdx + str.sizeInBytes > sizeInBytes) {
+ if (byteIdx + str.binarySection.sizeInBytes > binarySection.sizeInBytes) {
return -1;
}
- if (SegmentsUtil.equals(segments, offset + byteIdx,
- str.segments, str.offset, str.sizeInBytes)) {
+ if (SegmentsUtil.equals(binarySection.segments, binarySection.offset + byteIdx,
+ str.binarySection.segments, str.binarySection.offset, str.binarySection.sizeInBytes)) {
return charIdx;
}
byteIdx += numBytesForFirstByte(getByteOneSegment(byteIdx));
charIdx++;
- } while (byteIdx < sizeInBytes);
+ } while (byteIdx < binarySection.sizeInBytes);
return -1;
} else {
@@ -553,27 +589,27 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
int byteIdx = 0;
// position is char
int charIdx = 0;
- int segSize = segments[0].size();
+ int segSize = binarySection.segments[0].size();
SegmentAndOffset index = firstSegmentAndOffset(segSize);
- while (byteIdx < sizeInBytes && charIdx < fromIndex) {
+ while (byteIdx < binarySection.sizeInBytes && charIdx < fromIndex) {
int charBytes = numBytesForFirstByte(index.value());
byteIdx += charBytes;
charIdx++;
index.skipBytes(charBytes, segSize);
}
do {
- if (byteIdx + str.sizeInBytes > sizeInBytes) {
+ if (byteIdx + str.binarySection.sizeInBytes > binarySection.sizeInBytes) {
return -1;
}
- if (SegmentsUtil.equals(segments, offset + byteIdx,
- str.segments, str.offset, str.sizeInBytes)) {
+ if (SegmentsUtil.equals(binarySection.segments, binarySection.offset + byteIdx,
+ str.binarySection.segments, str.binarySection.offset, str.binarySection.sizeInBytes)) {
return charIdx;
}
int charBytes = numBytesForFirstByte(index.segment.get(index.offset));
byteIdx += charBytes;
charIdx++;
index.skipBytes(charBytes, segSize);
- } while (byteIdx < sizeInBytes);
+ } while (byteIdx < binarySection.sizeInBytes);
return -1;
}
@@ -587,14 +623,14 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
if (javaObject != null) {
return javaToUpperCase();
}
- if (sizeInBytes == 0) {
+ if (binarySection.sizeInBytes == 0) {
return EMPTY_UTF8;
}
- int size = segments[0].size();
+ int size = binarySection.segments[0].size();
SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size);
- byte[] bytes = new byte[sizeInBytes];
+ byte[] bytes = new byte[binarySection.sizeInBytes];
bytes[0] = (byte) Character.toTitleCase(segmentAndOffset.value());
- for (int i = 0; i < sizeInBytes; i++) {
+ for (int i = 0; i < binarySection.sizeInBytes; i++) {
byte b = segmentAndOffset.value();
if (numBytesForFirstByte(b) != 1) {
// fallback
@@ -624,14 +660,14 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
if (javaObject != null) {
return javaToLowerCase();
}
- if (sizeInBytes == 0) {
+ if (binarySection.sizeInBytes == 0) {
return EMPTY_UTF8;
}
- int size = segments[0].size();
+ int size = binarySection.segments[0].size();
SegmentAndOffset segmentAndOffset = startSegmentAndOffset(size);
- byte[] bytes = new byte[sizeInBytes];
+ byte[] bytes = new byte[binarySection.sizeInBytes];
bytes[0] = (byte) Character.toTitleCase(segmentAndOffset.value());
- for (int i = 0; i < sizeInBytes; i++) {
+ for (int i = 0; i < binarySection.sizeInBytes; i++) {
byte b = segmentAndOffset.value();
if (numBytesForFirstByte(b) != 1) {
// fallback
@@ -657,11 +693,11 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
// ------------------------------------------------------------------------------------------
byte getByteOneSegment(int i) {
- return segments[0].get(offset + i);
+ return binarySection.segments[0].get(binarySection.offset + i);
}
boolean inFirstSegment() {
- return sizeInBytes + offset <= segments[0].size();
+ return binarySection.sizeInBytes + binarySection.offset <= binarySection.segments[0].size();
}
private boolean matchAt(final BinaryString s, int pos) {
@@ -669,41 +705,50 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
}
private boolean matchAtOneSeg(final BinaryString s, int pos) {
- return s.sizeInBytes + pos <= sizeInBytes && pos >= 0 &&
- segments[0].equalTo(s.segments[0], offset + pos, s.offset, s.sizeInBytes);
+ return s.binarySection.sizeInBytes + pos <= binarySection.sizeInBytes && pos >= 0 &&
+ binarySection.segments[0].equalTo(
+ s.binarySection.segments[0],
+ binarySection.offset + pos,
+ s.binarySection.offset,
+ s.binarySection.sizeInBytes);
}
private boolean matchAtVarSeg(final BinaryString s, int pos) {
- return s.sizeInBytes + pos <= sizeInBytes && pos >= 0 &&
- SegmentsUtil.equals(segments, offset + pos, s.segments, s.offset, s.sizeInBytes);
+ return s.binarySection.sizeInBytes + pos <= binarySection.sizeInBytes && pos >= 0 &&
+ SegmentsUtil.equals(
+ binarySection.segments,
+ binarySection.offset + pos,
+ s.binarySection.segments,
+ s.binarySection.offset,
+ s.binarySection.sizeInBytes);
}
BinaryString copyBinaryStringInOneSeg(int start, int len) {
byte[] newBytes = new byte[len];
- segments[0].get(offset + start, newBytes, 0, len);
+ binarySection.segments[0].get(binarySection.offset + start, newBytes, 0, len);
return fromBytes(newBytes);
}
BinaryString copyBinaryString(int start, int end) {
int len = end - start + 1;
byte[] newBytes = new byte[len];
- SegmentsUtil.copyToBytes(segments, offset + start, newBytes, 0, len);
+ SegmentsUtil.copyToBytes(binarySection.segments, binarySection.offset + start, newBytes, 0, len);
return fromBytes(newBytes);
}
SegmentAndOffset firstSegmentAndOffset(int segSize) {
- int segIndex = offset / segSize;
- return new SegmentAndOffset(segIndex, offset % segSize);
+ int segIndex = binarySection.offset / segSize;
+ return new SegmentAndOffset(segIndex, binarySection.offset % segSize);
}
SegmentAndOffset lastSegmentAndOffset(int segSize) {
- int lastOffset = offset + sizeInBytes - 1;
+ int lastOffset = binarySection.offset + binarySection.sizeInBytes - 1;
int segIndex = lastOffset / segSize;
return new SegmentAndOffset(segIndex, lastOffset % segSize);
}
private SegmentAndOffset startSegmentAndOffset(int segSize) {
- return inFirstSegment() ? new SegmentAndOffset(0, offset) : firstSegmentAndOffset(segSize);
+ return inFirstSegment() ? new SegmentAndOffset(0, binarySection.offset) : firstSegmentAndOffset(segSize);
}
/**
@@ -716,13 +761,13 @@ public final class BinaryString extends LazyBinaryFormat<String> implements Comp
private SegmentAndOffset(int segIndex, int offset) {
this.segIndex = segIndex;
- this.segment = segments[segIndex];
+ this.segment = binarySection.segments[segIndex];
this.offset = offset;
}
private void assignSegment() {
- segment = segIndex >= 0 && segIndex < segments.length ?
- segments[segIndex] : null;
+ segment = segIndex >= 0 && segIndex < binarySection.segments.length ?
+ binarySection.segments[segIndex] : null;
}
void previousByte(int segSize) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java
index 6faa51a..021a8af 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryStringUtil.java
@@ -582,9 +582,9 @@ public class BinaryStringUtil {
int byteIdx = 0;
// position of last split1
int lastSplit1Idx = -1;
- while (byteIdx < str.sizeInBytes) {
+ while (byteIdx < str.getSizeInBytes()) {
// If find next split1 in str, process current kv
- if (str.segments[0].get(str.offset + byteIdx) == split1) {
+ if (str.getSegments()[0].get(str.getOffset() + byteIdx) == split1) {
int currentKeyIdx = lastSplit1Idx + 1;
// If key of current kv is keyName, return the value directly
BinaryString value = findValueOfKey(str, split2, keyName, currentKeyIdx, byteIdx);
@@ -597,7 +597,7 @@ public class BinaryStringUtil {
}
// process the string which is not ends with split1
int currentKeyIdx = lastSplit1Idx + 1;
- return findValueOfKey(str, split2, keyName, currentKeyIdx, str.sizeInBytes);
+ return findValueOfKey(str, split2, keyName, currentKeyIdx, str.getSizeInBytes());
} else {
return keyValueSlow(str, split1, split2, keyName);
}
@@ -609,16 +609,16 @@ public class BinaryStringUtil {
BinaryString keyName,
int start,
int end) {
- int keyNameLen = keyName.sizeInBytes;
+ int keyNameLen = keyName.getSizeInBytes();
for (int idx = start; idx < end; idx++) {
- if (str.segments[0].get(str.offset + idx) == split) {
+ if (str.getSegments()[0].get(str.getOffset() + idx) == split) {
if (idx == start + keyNameLen &&
- str.segments[0].equalTo(keyName.segments[0], str.offset + start,
- keyName.offset, keyNameLen)) {
+ str.getSegments()[0].equalTo(keyName.getSegments()[0], str.getOffset() + start,
+ keyName.getOffset(), keyNameLen)) {
int valueIdx = idx + 1;
int valueLen = end - valueIdx;
byte[] bytes = new byte[valueLen];
- str.segments[0].get(str.offset + valueIdx, bytes, 0, valueLen);
+ str.getSegments()[0].get(str.getOffset() + valueIdx, bytes, 0, valueLen);
return fromBytes(bytes, 0, valueLen);
} else {
return null;
@@ -637,7 +637,7 @@ public class BinaryStringUtil {
int byteIdx = 0;
// position of last split1
int lastSplit1Idx = -1;
- while (byteIdx < str.sizeInBytes) {
+ while (byteIdx < str.getSizeInBytes()) {
// If find next split1 in str, process current kv
if (str.byteAt(byteIdx) == split1) {
int currentKeyIdx = lastSplit1Idx + 1;
@@ -650,7 +650,7 @@ public class BinaryStringUtil {
byteIdx++;
}
int currentKeyIdx = lastSplit1Idx + 1;
- return findValueOfKeySlow(str, split2, keyName, currentKeyIdx, str.sizeInBytes);
+ return findValueOfKeySlow(str, split2, keyName, currentKeyIdx, str.getSizeInBytes());
}
private static BinaryString findValueOfKeySlow(
@@ -659,14 +659,14 @@ public class BinaryStringUtil {
BinaryString keyName,
int start,
int end) {
- int keyNameLen = keyName.sizeInBytes;
+ int keyNameLen = keyName.getSizeInBytes();
for (int idx = start; idx < end; idx++) {
if (str.byteAt(idx) == split) {
if (idx == start + keyNameLen &&
- SegmentsUtil.equals(str.segments, str.offset + start, keyName.segments,
- keyName.offset, keyNameLen)) {
+ SegmentsUtil.equals(str.getSegments(), str.getOffset() + start, keyName.getSegments(),
+ keyName.getOffset(), keyNameLen)) {
int valueIdx = idx + 1;
- byte[] bytes = SegmentsUtil.copyToBytes(str.segments, str.offset + valueIdx, end - valueIdx);
+ byte[] bytes = SegmentsUtil.copyToBytes(str.getSegments(), str.getOffset() + valueIdx, end - valueIdx);
return fromBytes(bytes);
} else {
return null;
@@ -740,8 +740,8 @@ public class BinaryStringUtil {
int offset = 0;
for (BinaryString input : inputs) {
if (input != null) {
- int len = input.sizeInBytes;
- SegmentsUtil.copyToBytes(input.segments, input.offset, result, offset, len);
+ int len = input.getSizeInBytes();
+ SegmentsUtil.copyToBytes(input.getSegments(), input.getOffset(), result, offset, len);
offset += len;
}
}
@@ -771,7 +771,7 @@ public class BinaryStringUtil {
for (BinaryString input : inputs) {
if (input != null) {
input.ensureMaterialized();
- numInputBytes += input.sizeInBytes;
+ numInputBytes += input.getSizeInBytes();
numInputs++;
}
}
@@ -783,21 +783,26 @@ public class BinaryStringUtil {
// Allocate a new byte array, and copy the inputs one by one into it.
// The size of the new array is the size of all inputs, plus the separators.
- final byte[] result = new byte[numInputBytes + (numInputs - 1) * separator.sizeInBytes];
+ final byte[] result = new byte[numInputBytes + (numInputs - 1) * separator.getSizeInBytes()];
int offset = 0;
int j = 0;
for (BinaryString input : inputs) {
if (input != null) {
- int len = input.sizeInBytes;
- SegmentsUtil.copyToBytes(input.segments, input.offset, result, offset, len);
+ int len = input.getSizeInBytes();
+ SegmentsUtil.copyToBytes(input.getSegments(), input.getOffset(), result, offset, len);
offset += len;
j++;
// Add separator if this is not the last input.
if (j < numInputs) {
- SegmentsUtil.copyToBytes(separator.segments, separator.offset, result, offset, separator.sizeInBytes);
- offset += separator.sizeInBytes;
+ SegmentsUtil.copyToBytes(
+ separator.getSegments(),
+ separator.getOffset(),
+ result,
+ offset,
+ separator.getSizeInBytes());
+ offset += separator.getSizeInBytes();
}
}
}
@@ -812,13 +817,13 @@ public class BinaryStringUtil {
public static BinaryString reverse(BinaryString str) {
str.ensureMaterialized();
if (str.inFirstSegment()) {
- byte[] result = new byte[str.sizeInBytes];
+ byte[] result = new byte[str.getSizeInBytes()];
// position in byte
int byteIdx = 0;
- while (byteIdx < str.sizeInBytes) {
+ while (byteIdx < str.getSizeInBytes()) {
int charBytes = numBytesForFirstByte(str.getByteOneSegment(byteIdx));
- str.segments[0].get(
- str.offset + byteIdx,
+ str.getSegments()[0].get(
+ str.getOffset() + byteIdx,
result,
result.length - byteIdx - charBytes,
charBytes);
@@ -831,16 +836,16 @@ public class BinaryStringUtil {
}
private static BinaryString reverseMultiSegs(BinaryString str) {
- byte[] result = new byte[str.sizeInBytes];
+ byte[] result = new byte[str.getSizeInBytes()];
// position in byte
int byteIdx = 0;
- int segSize = str.segments[0].size();
+ int segSize = str.getSegments()[0].size();
BinaryString.SegmentAndOffset index = str.firstSegmentAndOffset(segSize);
- while (byteIdx < str.sizeInBytes) {
+ while (byteIdx < str.getSizeInBytes()) {
int charBytes = numBytesForFirstByte(index.value());
SegmentsUtil.copyMultiSegmentsToBytes(
- str.segments,
- str.offset + byteIdx,
+ str.getSegments(),
+ str.getOffset() + byteIdx,
result,
result.length - byteIdx - charBytes,
charBytes);
@@ -870,14 +875,14 @@ public class BinaryStringUtil {
if (str.inFirstSegment()) {
int s = 0;
// skip all of the space (0x20) in the left side
- while (s < str.sizeInBytes && str.getByteOneSegment(s) == 0x20) {
+ while (s < str.getSizeInBytes() && str.getByteOneSegment(s) == 0x20) {
s++;
}
- if (s == str.sizeInBytes) {
+ if (s == str.getSizeInBytes()) {
// empty string
return EMPTY_UTF8;
} else {
- return str.copyBinaryStringInOneSeg(s, str.sizeInBytes - s);
+ return str.copyBinaryStringInOneSeg(s, str.getSizeInBytes() - s);
}
} else {
return trimLeftSlow(str);
@@ -886,18 +891,18 @@ public class BinaryStringUtil {
private static BinaryString trimLeftSlow(BinaryString str) {
int s = 0;
- int segSize = str.segments[0].size();
+ int segSize = str.getSegments()[0].size();
BinaryString.SegmentAndOffset front = str.firstSegmentAndOffset(segSize);
// skip all of the space (0x20) in the left side
- while (s < str.sizeInBytes && front.value() == 0x20) {
+ while (s < str.getSizeInBytes() && front.value() == 0x20) {
s++;
front.nextByte(segSize);
}
- if (s == str.sizeInBytes) {
+ if (s == str.getSizeInBytes()) {
// empty string
return EMPTY_UTF8;
} else {
- return str.copyBinaryString(s, str.sizeInBytes - 1);
+ return str.copyBinaryString(s, str.getSizeInBytes() - 1);
}
}
@@ -929,7 +934,7 @@ public class BinaryStringUtil {
}
if (str.inFirstSegment()) {
int searchIdx = 0;
- while (searchIdx < str.sizeInBytes) {
+ while (searchIdx < str.getSizeInBytes()) {
int charBytes = numBytesForFirstByte(str.getByteOneSegment(searchIdx));
BinaryString currentChar = str.copyBinaryStringInOneSeg(searchIdx, charBytes);
// try to find the matching for the character in the trimString characters.
@@ -940,10 +945,10 @@ public class BinaryStringUtil {
}
}
// empty string
- if (searchIdx >= str.sizeInBytes) {
+ if (searchIdx >= str.getSizeInBytes()) {
return EMPTY_UTF8;
} else {
- return str.copyBinaryStringInOneSeg(searchIdx, str.sizeInBytes - searchIdx);
+ return str.copyBinaryStringInOneSeg(searchIdx, str.getSizeInBytes() - searchIdx);
}
} else {
return trimLeftSlow(str, trimStr);
@@ -952,9 +957,9 @@ public class BinaryStringUtil {
private static BinaryString trimLeftSlow(BinaryString str, BinaryString trimStr) {
int searchIdx = 0;
- int segSize = str.segments[0].size();
+ int segSize = str.getSegments()[0].size();
BinaryString.SegmentAndOffset front = str.firstSegmentAndOffset(segSize);
- while (searchIdx < str.sizeInBytes) {
+ while (searchIdx < str.getSizeInBytes()) {
int charBytes = numBytesForFirstByte(front.value());
BinaryString currentChar = str.copyBinaryString(searchIdx, searchIdx + charBytes - 1);
if (trimStr.contains(currentChar)) {
@@ -964,18 +969,18 @@ public class BinaryStringUtil {
break;
}
}
- if (searchIdx == str.sizeInBytes) {
+ if (searchIdx == str.getSizeInBytes()) {
// empty string
return EMPTY_UTF8;
} else {
- return str.copyBinaryString(searchIdx, str.sizeInBytes - 1);
+ return str.copyBinaryString(searchIdx, str.getSizeInBytes() - 1);
}
}
public static BinaryString trimRight(BinaryString str) {
str.ensureMaterialized();
if (str.inFirstSegment()) {
- int e = str.sizeInBytes - 1;
+ int e = str.getSizeInBytes() - 1;
// skip all of the space (0x20) in the right side
while (e >= 0 && str.getByteOneSegment(e) == 0x20) {
e--;
@@ -993,8 +998,8 @@ public class BinaryStringUtil {
}
private static BinaryString trimRightSlow(BinaryString str) {
- int e = str.sizeInBytes - 1;
- int segSize = str.segments[0].size();
+ int e = str.getSizeInBytes() - 1;
+ int segSize = str.getSegments()[0].size();
BinaryString.SegmentAndOffset behind = str.lastSegmentAndOffset(segSize);
// skip all of the space (0x20) in the right side
while (e >= 0 && behind.value() == 0x20) {
@@ -1032,10 +1037,10 @@ public class BinaryStringUtil {
int charIdx = 0;
int byteIdx = 0;
// each element in charLens is length of character in the source string
- int[] charLens = new int[str.sizeInBytes];
+ int[] charLens = new int[str.getSizeInBytes()];
// each element in charStartPos is start position of first byte in the source string
- int[] charStartPos = new int[str.sizeInBytes];
- while (byteIdx < str.sizeInBytes) {
+ int[] charStartPos = new int[str.getSizeInBytes()];
+ while (byteIdx < str.getSizeInBytes()) {
charStartPos[charIdx] = byteIdx;
charLens[charIdx] = numBytesForFirstByte(str.getByteOneSegment(byteIdx));
byteIdx += charLens[charIdx];
@@ -1043,7 +1048,7 @@ public class BinaryStringUtil {
}
// searchIdx points to the first character which is not in trim string from the right
// end.
- int searchIdx = str.sizeInBytes - 1;
+ int searchIdx = str.getSizeInBytes() - 1;
charIdx -= 1;
while (charIdx >= 0) {
BinaryString currentChar = str.copyBinaryStringInOneSeg(
@@ -1069,13 +1074,13 @@ public class BinaryStringUtil {
private static BinaryString trimRightSlow(BinaryString str, BinaryString trimStr) {
int charIdx = 0;
int byteIdx = 0;
- int segSize = str.segments[0].size();
+ int segSize = str.getSegments()[0].size();
BinaryString.SegmentAndOffset index = str.firstSegmentAndOffset(segSize);
// each element in charLens is length of character in the source string
- int[] charLens = new int[str.sizeInBytes];
+ int[] charLens = new int[str.getSizeInBytes()];
// each element in charStartPos is start position of first byte in the source string
- int[] charStartPos = new int[str.sizeInBytes];
- while (byteIdx < str.sizeInBytes) {
+ int[] charStartPos = new int[str.getSizeInBytes()];
+ while (byteIdx < str.getSizeInBytes()) {
charStartPos[charIdx] = byteIdx;
int charBytes = numBytesForFirstByte(index.value());
charLens[charIdx] = charBytes;
@@ -1085,7 +1090,7 @@ public class BinaryStringUtil {
}
// searchIdx points to the first character which is not in trim string from the right
// end.
- int searchIdx = str.sizeInBytes - 1;
+ int searchIdx = str.getSizeInBytes() - 1;
charIdx -= 1;
while (charIdx >= 0) {
BinaryString currentChar = str.copyBinaryString(
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
index 8163f57..3a559b7 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
import org.apache.flink.table.runtime.typeutils.BaseMapSerializer;
import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -68,7 +69,7 @@ public interface BinaryWriter {
void writeRow(int pos, BaseRow value, BaseRowSerializer type);
- void writeGeneric(int pos, BinaryGeneric value);
+ void writeGeneric(int pos, BinaryGeneric value, BinaryGenericSerializer serializer);
/**
* Finally, complete write to set real size to binary.
@@ -124,7 +125,7 @@ public interface BinaryWriter {
writer.writeRow(pos, (BaseRow) o, (BaseRowSerializer) serializer);
break;
case ANY:
- writer.writeGeneric(pos, (BinaryGeneric) o);
+ writer.writeGeneric(pos, (BinaryGeneric) o, (BinaryGenericSerializer) serializer);
break;
case BINARY:
case VARBINARY:
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
index b6e6bb2..1185457 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java
@@ -620,7 +620,7 @@ public class DataFormatConverters {
@Override
BinaryGeneric<T> toInternalImpl(T value) {
- return new BinaryGeneric<>(value, serializer);
+ return new BinaryGeneric<>(value);
}
@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/LazyBinaryFormat.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/LazyBinaryFormat.java
index be29ee1..5f91820 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/LazyBinaryFormat.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/LazyBinaryFormat.java
@@ -17,7 +17,11 @@
package org.apache.flink.table.dataformat;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import java.io.IOException;
/**
* Lazy binary format.
@@ -37,59 +41,77 @@ import org.apache.flink.core.memory.MemorySegment;
* It can lazy the conversions as much as possible. Only when it is needed can it be converted
* into the required form.
*/
-public abstract class LazyBinaryFormat<T> extends BinaryFormat {
+public abstract class LazyBinaryFormat<T> implements BinaryFormat {
- protected T javaObject;
+ T javaObject;
+ BinarySection binarySection;
public LazyBinaryFormat() {
this(null, -1, -1, null);
}
public LazyBinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes, T javaObject) {
- super(segments, offset, sizeInBytes);
- this.javaObject = javaObject;
+ this(javaObject, new BinarySection(segments, offset, sizeInBytes));
}
public LazyBinaryFormat(MemorySegment[] segments, int offset, int sizeInBytes) {
- this(segments, offset, sizeInBytes, null);
+ this(null, new BinarySection(segments, offset, sizeInBytes));
}
public LazyBinaryFormat(T javaObject) {
- this(null, -1, -1, javaObject);
+ this(javaObject, null);
+ }
+
+ public LazyBinaryFormat(T javaObject, BinarySection binarySection) {
+ this.javaObject = javaObject;
+ this.binarySection = binarySection;
}
public T getJavaObject() {
return javaObject;
}
+ /**
+ * Must be public as it is used during code generation.
+ */
public void setJavaObject(T javaObject) {
this.javaObject = javaObject;
}
@Override
public MemorySegment[] getSegments() {
- ensureMaterialized();
- return segments;
+ if (binarySection == null) {
+ throw new IllegalStateException("Lazy Binary Format was not materialized");
+ }
+ return binarySection.segments;
}
@Override
public int getOffset() {
- ensureMaterialized();
- return offset;
+ if (binarySection == null) {
+ throw new IllegalStateException("Lazy Binary Format was not materialized");
+ }
+ return binarySection.offset;
}
@Override
public int getSizeInBytes() {
- ensureMaterialized();
- return sizeInBytes;
+ if (binarySection == null) {
+ throw new IllegalStateException("Lazy Binary Format was not materialized");
+ }
+ return binarySection.sizeInBytes;
}
/**
* Ensure we have materialized binary format.
*/
- public void ensureMaterialized() {
- if (segments == null) {
- materialize();
+ public final void ensureMaterialized(TypeSerializer<T> serializer) {
+ if (binarySection == null) {
+ try {
+ this.binarySection = materialize(serializer);
+ } catch (IOException e) {
+ throw new WrappingRuntimeException(e);
+ }
}
}
@@ -98,24 +120,5 @@ public abstract class LazyBinaryFormat<T> extends BinaryFormat {
* Inherited classes need to hold the information they need.
* (For example, BinaryGeneric needs javaObjectSerializer).
*/
- public abstract void materialize();
-
- @Override
- public boolean equals(Object o) {
- if (o != null && o instanceof LazyBinaryFormat) {
- LazyBinaryFormat other = (LazyBinaryFormat) o;
-
- ensureMaterialized();
- other.ensureMaterialized();
- return binaryEquals(other);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- ensureMaterialized();
- return super.hashCode();
- }
+ protected abstract BinarySection materialize(TypeSerializer<T> serializer) throws IOException;
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
index 252eec9..f31a138 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java
@@ -21,6 +21,7 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.table.runtime.util.SegmentsUtil;
+import static org.apache.flink.table.dataformat.BinaryFormat.readBinaryFieldFromSegments;
import static org.apache.flink.table.dataformat.BinaryRow.calculateBitSetWidthInBytes;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -32,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* has a possibility to cross the boundary of a segment, while the fixed-length part of {@link BinaryRow}
* must fit into its first memory segment.
*/
-public final class NestedRow extends BinaryFormat implements BaseRow {
+public final class NestedRow extends BinarySection implements BaseRow {
private final int arity;
private final int nullBitsSizeInBytes;
@@ -224,7 +225,7 @@ public final class NestedRow extends BinaryFormat implements BaseRow {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndLen = SegmentsUtil.getLong(segments, fieldOffset);
- return BinaryString.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
+ return BinaryFormat.readBinaryStringFieldFromSegments(segments, offset, fieldOffset, offsetAndLen);
}
@Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializer.java
index b0ca78f..29ba7eb 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializer.java
@@ -19,18 +19,15 @@
package org.apache.flink.table.runtime.typeutils;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.table.dataformat.BinaryGeneric;
import org.apache.flink.table.runtime.util.SegmentsUtil;
-import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
@@ -55,12 +52,19 @@ public final class BinaryGenericSerializer<T> extends TypeSerializer<BinaryGener
@Override
public BinaryGeneric<T> createInstance() {
- return new BinaryGeneric<>(serializer.createInstance(), serializer);
+ return new BinaryGeneric<>(serializer.createInstance());
}
@Override
public BinaryGeneric<T> copy(BinaryGeneric<T> from) {
- return from.copy();
+ from.ensureMaterialized(serializer);
+ byte[] bytes = SegmentsUtil.copyToBytes(from.getSegments(), from.getOffset(), from.getSizeInBytes());
+ T newJavaObject = from.getJavaObject() == null ? null : serializer.copy(from.getJavaObject());
+ return new BinaryGeneric<>(
+ new MemorySegment[]{MemorySegmentFactory.wrap(bytes)},
+ 0,
+ bytes.length,
+ newJavaObject);
}
@Override
@@ -75,6 +79,7 @@ public final class BinaryGenericSerializer<T> extends TypeSerializer<BinaryGener
@Override
public void serialize(BinaryGeneric<T> record, DataOutputView target) throws IOException {
+ record.ensureMaterialized(serializer);
target.writeInt(record.getSizeInBytes());
SegmentsUtil.copyToView(record.getSegments(), record.getOffset(), record.getSizeInBytes(), target);
}
@@ -86,7 +91,8 @@ public final class BinaryGenericSerializer<T> extends TypeSerializer<BinaryGener
source.readFully(bytes);
return new BinaryGeneric<>(
new MemorySegment[] {MemorySegmentFactory.wrap(bytes)},
- 0, bytes.length, serializer);
+ 0,
+ bytes.length);
}
@Override
@@ -103,7 +109,7 @@ public final class BinaryGenericSerializer<T> extends TypeSerializer<BinaryGener
@Override
public BinaryGenericSerializer<T> duplicate() {
- return this;
+ return new BinaryGenericSerializer<>(serializer.duplicate());
}
@Override
@@ -127,63 +133,41 @@ public final class BinaryGenericSerializer<T> extends TypeSerializer<BinaryGener
@Override
public TypeSerializerSnapshot<BinaryGeneric<T>> snapshotConfiguration() {
- return new BinaryGenericSerializerSnapshot<>(serializer);
+ return new BinaryGenericSerializerSnapshot<>(this);
+ }
+
+ public TypeSerializer<T> getInnerSerializer() {
+ return serializer;
}
/**
* {@link TypeSerializerSnapshot} for {@link BinaryGenericSerializer}.
*/
- public static final class BinaryGenericSerializerSnapshot<T> implements TypeSerializerSnapshot<BinaryGeneric<T>> {
- private static final int CURRENT_VERSION = 3;
-
- private TypeSerializer<T> previousSerializer;
+ public static final class BinaryGenericSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<BinaryGeneric<T>, BinaryGenericSerializer<T>> {
@SuppressWarnings("unused")
public BinaryGenericSerializerSnapshot() {
- // this constructor is used when restoring from a checkpoint/savepoint.
- }
-
- BinaryGenericSerializerSnapshot(TypeSerializer<T> serializer) {
- this.previousSerializer = serializer;
+ super(BinaryGenericSerializer.class);
}
- @Override
- public int getCurrentVersion() {
- return CURRENT_VERSION;
- }
-
- @Override
- public void writeSnapshot(DataOutputView out) throws IOException {
- InstantiationUtil.serializeObject(new DataOutputViewStream(out), previousSerializer);
+ public BinaryGenericSerializerSnapshot(BinaryGenericSerializer<T> serializerInstance) {
+ super(serializerInstance);
}
@Override
- public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
- try {
- this.previousSerializer = InstantiationUtil.deserializeObject(
- new DataInputViewStream(in), userCodeClassLoader);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
+ protected int getCurrentOuterSnapshotVersion() {
+ return 0;
}
@Override
- public TypeSerializer<BinaryGeneric<T>> restoreSerializer() {
- return new BinaryGenericSerializer<>(previousSerializer);
+ protected TypeSerializer<?>[] getNestedSerializers(BinaryGenericSerializer<T> outerSerializer) {
+ return new TypeSerializer[]{outerSerializer.serializer};
}
@Override
- public TypeSerializerSchemaCompatibility<BinaryGeneric<T>> resolveSchemaCompatibility(TypeSerializer<BinaryGeneric<T>> newSerializer) {
- if (!(newSerializer instanceof BinaryGenericSerializer)) {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
-
- BinaryGenericSerializer newBinaryGenericSerializer = (BinaryGenericSerializer) newSerializer;
- if (!previousSerializer.equals(newBinaryGenericSerializer.serializer)) {
- return TypeSerializerSchemaCompatibility.incompatible();
- } else {
- return TypeSerializerSchemaCompatibility.compatibleAsIs();
- }
+ @SuppressWarnings("unchecked")
+ protected BinaryGenericSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ return new BinaryGenericSerializer<>((TypeSerializer<T>) nestedSerializers[0]);
}
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
index a86e001..82019d0 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
import org.apache.flink.table.runtime.typeutils.BaseMapSerializer;
import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -30,9 +31,11 @@ import org.junit.Test;
import java.math.BigDecimal;
+import static org.apache.flink.table.utils.BinaryGenericAsserter.equivalent;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
@@ -48,11 +51,13 @@ public class BaseRowTest {
private BinaryMap map;
private BinaryRow underRow;
private byte[] bytes;
+ private BinaryGenericSerializer<String> genericSerializer;
@Before
public void before() {
str = BinaryString.fromString("haha");
- generic = new BinaryGeneric<>("haha", StringSerializer.INSTANCE);
+ generic = new BinaryGeneric<>("haha");
+ genericSerializer = new BinaryGenericSerializer<>(StringSerializer.INSTANCE);
decimal1 = Decimal.fromLong(10, 5, 0);
decimal2 = Decimal.fromBigDecimal(new BigDecimal(11), 20, 0);
array = new BinaryArray();
@@ -98,7 +103,7 @@ public class BaseRowTest {
writer.writeFloat(5, 5);
writer.writeDouble(6, 6);
writer.writeString(8, str);
- writer.writeGeneric(9, generic);
+ writer.writeGeneric(9, generic, genericSerializer);
writer.writeDecimal(10, decimal1, 5);
writer.writeDecimal(11, decimal2, 20);
writer.writeArray(12, array, new BaseArraySerializer(DataTypes.INT().getLogicalType(), null));
@@ -193,7 +198,7 @@ public class BaseRowTest {
assertEquals(5, (int) row.getFloat(5));
assertEquals(6, (int) row.getDouble(6));
assertEquals(str, row.getString(8));
- assertEquals(generic, row.getGeneric(9));
+ assertThat(row.getGeneric(9), equivalent(generic, genericSerializer));
assertEquals(decimal1, row.getDecimal(10, 5, 0));
assertEquals(decimal2, row.getDecimal(11, 20, 0));
assertEquals(array, row.getArray(12));
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
index f64907e..9de5cdf 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
import org.apache.flink.table.runtime.typeutils.BaseMapSerializer;
import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
import org.apache.flink.table.runtime.util.SegmentsUtil;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -36,7 +37,9 @@ import org.junit.Test;
import java.math.BigDecimal;
import static org.apache.flink.table.dataformat.BinaryString.fromString;
+import static org.apache.flink.table.utils.BinaryGenericAsserter.equivalent;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
@@ -460,13 +463,14 @@ public class BinaryArrayTest {
public void testGeneric() {
BinaryArray array = new BinaryArray();
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
- BinaryGeneric<String> generic = new BinaryGeneric<>("hahah", StringSerializer.INSTANCE);
- writer.writeGeneric(0, generic);
+ BinaryGeneric<String> generic = new BinaryGeneric<>("hahah");
+ BinaryGenericSerializer<String> serializer = new BinaryGenericSerializer<>(StringSerializer.INSTANCE);
+ writer.writeGeneric(0, generic, serializer);
writer.setNullAt(1);
writer.complete();
BinaryGeneric newGeneric = array.getGeneric(0);
- assertEquals(generic, newGeneric);
+ assertThat(newGeneric, equivalent(generic, serializer));
assertTrue(array.isNullAt(1));
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
index fa870b4..3a90aaa 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.runtime.typeutils.BaseArraySerializer;
import org.apache.flink.table.runtime.typeutils.BaseMapSerializer;
import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowSerializer;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -68,11 +69,13 @@ import java.util.Set;
import static org.apache.flink.table.dataformat.BinaryString.fromBytes;
import static org.apache.flink.table.dataformat.BinaryString.fromString;
import static org.apache.flink.table.dataformat.DataFormatTestUtil.MyObj;
+import static org.apache.flink.table.utils.BinaryGenericAsserter.equivalent;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
@@ -444,18 +447,18 @@ public class BinaryRowTest {
public void testGeneric() {
BinaryRow row = new BinaryRow(3);
BinaryRowWriter writer = new BinaryRowWriter(row);
- BinaryGeneric<String> hahah = new BinaryGeneric<>("hahah", StringSerializer.INSTANCE);
- writer.writeGeneric(0, hahah);
+ BinaryGenericSerializer<String> binarySerializer = new BinaryGenericSerializer<>(StringSerializer.INSTANCE);
+ BinaryGeneric<String> hahah = new BinaryGeneric<>("hahah");
+ writer.writeGeneric(0, hahah, binarySerializer);
writer.setNullAt(1);
- hahah.ensureMaterialized();
- writer.writeGeneric(2, hahah);
+ writer.writeGeneric(2, hahah, binarySerializer);
writer.complete();
BinaryGeneric<String> generic0 = row.getGeneric(0);
- assertEquals(hahah, generic0);
+ assertThat(generic0, equivalent(hahah, binarySerializer));
assertTrue(row.isNullAt(1));
BinaryGeneric<String> generic2 = row.getGeneric(2);
- assertEquals(hahah, generic2);
+ assertThat(generic2, equivalent(hahah, binarySerializer));
}
@Test
@@ -619,18 +622,18 @@ public class BinaryRowTest {
GenericTypeInfo<MyObj> info = new GenericTypeInfo<>(MyObj.class);
TypeSerializer<MyObj> genericSerializer = info.createSerializer(new ExecutionConfig());
+ BinaryGenericSerializer<MyObj> binarySerializer = new BinaryGenericSerializer<>(genericSerializer);
BinaryRow row = new BinaryRow(4);
BinaryRowWriter writer = new BinaryRowWriter(row);
writer.writeInt(0, 0);
- BinaryGeneric<MyObj> myObj1 = new BinaryGeneric<>(new MyObj(0, 1), genericSerializer);
- writer.writeGeneric(1, myObj1);
- BinaryGeneric<MyObj> myObj2 = new BinaryGeneric<>(new MyObj(123, 5.0), genericSerializer);
- myObj2.ensureMaterialized();
- writer.writeGeneric(2, myObj2);
- BinaryGeneric<MyObj> myObj3 = new BinaryGeneric<>(new MyObj(1, 1), genericSerializer);
- writer.writeGeneric(3, myObj3);
+ BinaryGeneric<MyObj> myObj1 = new BinaryGeneric<>(new MyObj(0, 1));
+ writer.writeGeneric(1, myObj1, binarySerializer);
+ BinaryGeneric<MyObj> myObj2 = new BinaryGeneric<>(new MyObj(123, 5.0));
+ writer.writeGeneric(2, myObj2, binarySerializer);
+ BinaryGeneric<MyObj> myObj3 = new BinaryGeneric<>(new MyObj(1, 1));
+ writer.writeGeneric(3, myObj3, binarySerializer);
writer.complete();
assertTestGenericObjectRow(row, genericSerializer);
@@ -670,12 +673,30 @@ public class BinaryRowTest {
LocalDateTime localDateTime = LocalDateTime.of(localDate, localTime);
writer.writeInt(0, 0);
- writer.writeGeneric(1, new BinaryGeneric<>(new Date(123), SqlDateSerializer.INSTANCE));
- writer.writeGeneric(2, new BinaryGeneric<>(new Time(456), SqlTimeSerializer.INSTANCE));
- writer.writeGeneric(3, new BinaryGeneric<>(new Timestamp(789), SqlTimestampSerializer.INSTANCE));
- writer.writeGeneric(4, new BinaryGeneric<>(localDate, LocalDateSerializer.INSTANCE));
- writer.writeGeneric(5, new BinaryGeneric<>(localTime, LocalTimeSerializer.INSTANCE));
- writer.writeGeneric(6, new BinaryGeneric<>(localDateTime, LocalDateTimeSerializer.INSTANCE));
+ writer.writeGeneric(
+ 1,
+ new BinaryGeneric<>(new Date(123)),
+ new BinaryGenericSerializer<>(SqlDateSerializer.INSTANCE));
+ writer.writeGeneric(
+ 2,
+ new BinaryGeneric<>(new Time(456)),
+ new BinaryGenericSerializer<>(SqlTimeSerializer.INSTANCE));
+ writer.writeGeneric(
+ 3,
+ new BinaryGeneric<>(new Timestamp(789)),
+ new BinaryGenericSerializer<>(SqlTimestampSerializer.INSTANCE));
+ writer.writeGeneric(
+ 4,
+ new BinaryGeneric<>(localDate),
+ new BinaryGenericSerializer<>(LocalDateSerializer.INSTANCE));
+ writer.writeGeneric(
+ 5,
+ new BinaryGeneric<>(localTime),
+ new BinaryGenericSerializer<>(LocalTimeSerializer.INSTANCE));
+ writer.writeGeneric(
+ 6,
+ new BinaryGeneric<>(localDateTime),
+ new BinaryGenericSerializer<>(LocalDateTimeSerializer.INSTANCE));
writer.complete();
assertEquals(new Date(123), BinaryGeneric.getJavaObjectFromBinaryGeneric(
@@ -781,7 +802,7 @@ public class BinaryRowTest {
random.nextBytes(bytes);
writer.writeBinary(0, bytes);
writer.reset();
- writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), genericSerializer));
+ writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1)), new BinaryGenericSerializer<>(genericSerializer));
writer.complete();
int hash1 = row.hashCode();
@@ -789,7 +810,7 @@ public class BinaryRowTest {
random.nextBytes(bytes);
writer.writeBinary(0, bytes);
writer.reset();
- writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1), genericSerializer));
+ writer.writeGeneric(0, new BinaryGeneric<>(new MyObj(0, 1)), new BinaryGenericSerializer<>(genericSerializer));
writer.complete();
int hash2 = row.hashCode();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
index 015c7ff..c6a7649 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java
@@ -54,6 +54,7 @@ import static org.apache.flink.table.dataformat.BinaryStringUtil.trimRight;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -596,25 +597,25 @@ public class BinaryStringTest {
@Test
public void testEmptyString() {
BinaryString str2 = fromString("hahahahah");
- BinaryString str3 = new BinaryString();
+ BinaryString str3;
{
MemorySegment[] segments = new MemorySegment[2];
segments[0] = MemorySegmentFactory.wrap(new byte[10]);
segments[1] = MemorySegmentFactory.wrap(new byte[10]);
- str3.pointTo(segments, 15, 0);
+ str3 = BinaryString.fromAddress(segments, 15, 0);
}
assertTrue(BinaryString.EMPTY_UTF8.compareTo(str2) < 0);
assertTrue(str2.compareTo(BinaryString.EMPTY_UTF8) > 0);
- assertTrue(BinaryString.EMPTY_UTF8.compareTo(str3) == 0);
- assertTrue(str3.compareTo(BinaryString.EMPTY_UTF8) == 0);
+ assertEquals(0, BinaryString.EMPTY_UTF8.compareTo(str3));
+ assertEquals(0, str3.compareTo(BinaryString.EMPTY_UTF8));
- assertFalse(BinaryString.EMPTY_UTF8.equals(str2));
- assertFalse(str2.equals(BinaryString.EMPTY_UTF8));
+ assertNotEquals(BinaryString.EMPTY_UTF8, str2);
+ assertNotEquals(str2, BinaryString.EMPTY_UTF8);
- assertTrue(BinaryString.EMPTY_UTF8.equals(str3));
- assertTrue(str3.equals(BinaryString.EMPTY_UTF8));
+ assertEquals(BinaryString.EMPTY_UTF8, str3);
+ assertEquals(str3, BinaryString.EMPTY_UTF8);
}
@Test
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
index 2ba0237..23e3713 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/NestedRowTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.runtime.typeutils.BaseRowSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
+import org.apache.flink.table.runtime.typeutils.BinaryStringSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.junit.Test;
@@ -137,7 +139,7 @@ public class NestedRowTest {
gRow.setField(1, 5L);
gRow.setField(2, BinaryString.fromString("12345678"));
gRow.setField(3, null);
- gRow.setField(4, new BinaryGeneric<>(new MyObj(15, 5), genericSerializer));
+ gRow.setField(4, new BinaryGeneric<>(new MyObj(15, 5)));
BaseRowSerializer serializer = new BaseRowSerializer(
new LogicalType[]{
@@ -150,9 +152,9 @@ public class NestedRowTest {
new TypeSerializer[]{
IntSerializer.INSTANCE,
LongSerializer.INSTANCE,
- StringSerializer.INSTANCE,
- StringSerializer.INSTANCE,
- genericSerializer
+ BinaryStringSerializer.INSTANCE,
+ BinaryStringSerializer.INSTANCE,
+ new BinaryGenericSerializer<>(genericSerializer)
});
writer.writeRow(0, gRow, serializer);
writer.complete();
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseArraySerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseArraySerializerTest.java
index 100aef0..35bbc69 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseArraySerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseArraySerializerTest.java
@@ -84,7 +84,7 @@ public class BaseArraySerializerTest extends SerializerTestBase<BaseArray> {
MyObj inputObj = new MyObj(114514, 1919810);
BaseArray inputArray = new GenericArray(new BinaryGeneric[] {
- new BinaryGeneric<>(inputObj, new KryoSerializer<>(MyObj.class, config))
+ new BinaryGeneric<>(inputObj)
}, 1);
byte[] serialized;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseMapSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseMapSerializerTest.java
index afd1cf4..02c0b67 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseMapSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BaseMapSerializerTest.java
@@ -82,7 +82,7 @@ public class BaseMapSerializerTest extends SerializerTestBase<BaseMap> {
int inputKey = 998244353;
MyObj inputObj = new MyObj(114514, 1919810);
Map<Object, Object> javaMap = new HashMap<>();
- javaMap.put(inputKey, new BinaryGeneric<>(inputObj, new KryoSerializer<>(MyObj.class, config)));
+ javaMap.put(inputKey, new BinaryGeneric<>(inputObj));
BaseMap inputMap = new GenericMap(javaMap);
byte[] serialized;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializerTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializerTest.java
index d472392..f35227c 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializerTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/typeutils/BinaryGenericSerializerTest.java
@@ -21,11 +21,22 @@ package org.apache.flink.table.runtime.typeutils;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.table.dataformat.BinaryGeneric;
+import org.apache.flink.table.utils.BinaryGenericAsserter;
+import org.apache.flink.testutils.DeeplyEqualsChecker;
/**
* A test for the {@link BinaryGenericSerializer}.
*/
public class BinaryGenericSerializerTest extends SerializerTestBase<BinaryGeneric<String>> {
+ public BinaryGenericSerializerTest() {
+ super(new DeeplyEqualsChecker()
+ .withCustomCheck(
+ (o, o2) -> o instanceof BinaryGeneric && o2 instanceof BinaryGeneric,
+ (o, o2, checker) -> BinaryGenericAsserter.equivalent(
+ (BinaryGeneric) o2,
+ new BinaryGenericSerializer<>(StringSerializer.INSTANCE)).matches(o)
+ ));
+ }
@Override
protected BinaryGenericSerializer<String> createSerializer() {
@@ -45,11 +56,11 @@ public class BinaryGenericSerializerTest extends SerializerTestBase<BinaryGeneri
@Override
protected BinaryGeneric[] getTestData() {
return new BinaryGeneric[] {
- new BinaryGeneric<>("1", StringSerializer.INSTANCE),
- new BinaryGeneric<>("2", StringSerializer.INSTANCE),
- new BinaryGeneric<>("3", StringSerializer.INSTANCE),
- new BinaryGeneric<>("4", StringSerializer.INSTANCE),
- new BinaryGeneric<>("5", StringSerializer.INSTANCE)
+ new BinaryGeneric<>("1"),
+ new BinaryGeneric<>("2"),
+ new BinaryGeneric<>("3"),
+ new BinaryGeneric<>("4"),
+ new BinaryGeneric<>("5")
};
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/utils/BinaryGenericAsserter.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/utils/BinaryGenericAsserter.java
new file mode 100644
index 0000000..98af4a9
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/utils/BinaryGenericAsserter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils;
+
+import org.apache.flink.table.dataformat.BinaryGeneric;
+import org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer;
+import org.apache.flink.table.runtime.util.SegmentsUtil;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+import java.util.Arrays;
+
+/**
+ * A {@link org.hamcrest.Matcher} that allows equality check on {@link BinaryGeneric}s.
+ */
+public class BinaryGenericAsserter extends TypeSafeMatcher<BinaryGeneric> {
+ private final BinaryGeneric expected;
+ private final BinaryGenericSerializer serializer;
+
+ private BinaryGenericAsserter(
+ BinaryGeneric expected,
+ BinaryGenericSerializer serializer) {
+ this.expected = expected;
+ this.serializer = serializer;
+ }
+
+ /**
+ * Checks that the {@link BinaryGeneric} is equivalent to the expected one. The serializer will be used
+ * to ensure both objects are materialized into the binary form.
+ *
+ * @param expected the expected object
+ * @param serializer serializer used to materialize the underlying java object
+ * @return binary equality matcher
+ */
+ @SuppressWarnings("unchecked")
+ public static BinaryGenericAsserter equivalent(BinaryGeneric expected, BinaryGenericSerializer serializer) {
+ expected.ensureMaterialized(serializer.getInnerSerializer());
+ return new BinaryGenericAsserter(expected, serializer);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected boolean matchesSafely(BinaryGeneric item) {
+ item.ensureMaterialized(serializer.getInnerSerializer());
+ expected.ensureMaterialized(serializer.getInnerSerializer());
+
+ return item.getSizeInBytes() == expected.getSizeInBytes() &&
+ SegmentsUtil.equals(
+ item.getSegments(),
+ item.getOffset(),
+ expected.getSegments(),
+ expected.getOffset(),
+ item.getSizeInBytes());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ byte[] bytes = SegmentsUtil.getBytes(expected.getSegments(), expected.getOffset(), expected.getSizeInBytes());
+ description.appendText(Arrays.toString(bytes));
+ }
+}