You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/18 19:12:16 UTC
[flink] branch release-1.11 updated: [FLINK-16998][core] Add a
changeflag to Row
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 045ff18 [FLINK-16998][core] Add a changeflag to Row
045ff18 is described below
commit 045ff18faec85b19baaa9691e24a1eccc5460d9b
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 4 20:17:38 2020 +0200
[FLINK-16998][core] Add a changeflag to Row
Partial commit for supporting a changeflag without backwards compatibility.
This closes #12103.
---
.../apache/flink/api/common/typeinfo/Types.java | 2 +-
.../flink/api/java/typeutils/RowTypeInfo.java | 15 ++
.../runtime/{NullMaskUtils.java => MaskUtils.java} | 37 ++--
.../api/java/typeutils/runtime/RowComparator.java | 66 +++----
.../api/java/typeutils/runtime/RowSerializer.java | 197 ++++++++++++++-------
.../src/main/java/org/apache/flink/types/Row.java | 173 +++++++++++++-----
.../main/java/org/apache/flink/types/RowKind.java | 3 +
...lizerTest.java => LegacyRowSerializerTest.java} | 27 ++-
.../java/typeutils/runtime/RowComparatorTest.java | 34 ++--
.../runtime/RowSerializerMigrationTest.java | 2 +
.../java/typeutils/runtime/RowSerializerTest.java | 42 ++---
.../table/runtime/typeutils/PythonTypeUtils.java | 2 +-
.../serializers/python/RowDataSerializer.java | 8 +-
.../arrow/sources/RowArrowSourceFunctionTest.java | 2 +-
.../apache/flink/table/data/GenericRowData.java | 30 ++++
.../table/data/conversion/RowRowConverter.java | 4 +-
.../table/data/util/DataFormatConverters.java | 4 +-
.../flink/table/data/DataFormatConvertersTest.java | 3 +-
.../table/data/DataStructureConvertersTest.java | 5 +-
19 files changed, 435 insertions(+), 221 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index ede4396..34e0bef 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -183,7 +183,7 @@ public class Types {
* <p>A row is a fixed-length, null-aware composite type for storing multiple values in a
* deterministic field order. Every field can be null regardless of the field's type.
* The type of row fields cannot be automatically inferred; therefore, it is required to provide
- * type information whenever a row is used.
+ * type information whenever a row is produced.
*
* <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row instances
* must strictly adhere to the schema defined by the type info.
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
index aec070c..e99e0bc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -287,6 +287,21 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
}
/**
+ * Creates a serializer for the old {@link Row} format before Flink 1.11.
+ *
+ * <p>The serialization format has changed from 1.10 to 1.11 and added {@link Row#getKind()}.
+ */
+ @Deprecated
+ public TypeSerializer<Row> createLegacySerializer(ExecutionConfig config) {
+ int len = getArity();
+ TypeSerializer<?>[] fieldSerializers = new TypeSerializer[len];
+ for (int i = 0; i < len; i++) {
+ fieldSerializers[i] = types[i].createSerializer(config);
+ }
+ return new RowSerializer(fieldSerializers, true);
+ }
+
+ /**
* Returns the field types of the row. The order matches the order of the field names.
*/
public TypeInformation<?>[] getFieldTypes() {
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
similarity index 73%
rename from flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
rename to flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
index cfe562f..ea2830e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
@@ -20,14 +20,19 @@ package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Row;
import java.io.IOException;
+/**
+ * Utilities for reading and writing binary masks.
+ */
@Internal
-public class NullMaskUtils {
+public final class MaskUtils {
+
+ @SuppressWarnings("UnusedAssignment")
+ public static void writeMask(boolean[] mask, DataOutputView target) throws IOException {
+ final int len = mask.length;
- public static void writeNullMask(int len, Row value, DataOutputView target) throws IOException {
int b = 0x00;
int bytePos = 0;
@@ -40,8 +45,8 @@ public class NullMaskUtils {
numPos = Math.min(8, len - fieldPos);
while (bytePos < numPos) {
b = b << 1;
- // set bit if field is null
- if (value.getField(fieldPos + bytePos) == null) {
+ // set bit if element is true
+ if (mask[fieldPos + bytePos]) {
b |= 0x01;
}
bytePos += 1;
@@ -54,10 +59,9 @@ public class NullMaskUtils {
}
}
- public static void readIntoNullMask(
- int len,
- DataInputView source,
- boolean[] nullMask) throws IOException {
+ @SuppressWarnings("UnusedAssignment")
+ public static void readIntoMask(DataInputView source, boolean[] mask) throws IOException {
+ final int len = mask.length;
int b = 0x00;
int bytePos = 0;
@@ -70,7 +74,7 @@ public class NullMaskUtils {
bytePos = 0;
numPos = Math.min(8, len - fieldPos);
while (bytePos < numPos) {
- nullMask[fieldPos + bytePos] = (b & 0x80) > 0;
+ mask[fieldPos + bytePos] = (b & 0x80) > 0;
b = b << 1;
bytePos += 1;
}
@@ -78,11 +82,12 @@ public class NullMaskUtils {
}
}
- public static void readIntoAndCopyNullMask(
- int len,
- DataInputView source,
- DataOutputView target,
- boolean[] nullMask) throws IOException {
+ @SuppressWarnings("UnusedAssignment")
+ public static void readIntoAndCopyMask(
+ DataInputView source,
+ DataOutputView target,
+ boolean[] mask) throws IOException {
+ final int len = mask.length;
int b = 0x00;
int bytePos = 0;
@@ -97,7 +102,7 @@ public class NullMaskUtils {
bytePos = 0;
numPos = Math.min(8, len - fieldPos);
while (bytePos < numPos) {
- nullMask[fieldPos + bytePos] = (b & 0x80) > 0;
+ mask[fieldPos + bytePos] = (b & 0x80) > 0;
b = b << 1;
bytePos += 1;
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
index 135623b..3f80100 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
@@ -32,16 +32,21 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask;
+import static org.apache.flink.api.java.typeutils.runtime.RowSerializer.ROW_KIND_OFFSET;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
- * Comparator for {@link Row}
+ * Comparator for {@link Row}.
+ *
+ * <p>Note: Since comparators are used only in DataSet API for batch use cases, this comparator assumes the
+ * latest serialization format and ignores {@link Row#getKind()} for simplicity of the implementation
+ * and efficiency.
*/
@Internal
public class RowComparator extends CompositeTypeComparator<Row> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
/** The number of fields of the Row */
private final int arity;
/** key positions describe which fields are keys in what order */
@@ -56,9 +61,10 @@ public class RowComparator extends CompositeTypeComparator<Row> {
private final int normalizableKeyPrefixLen;
private final boolean invertNormKey;
- // null masks for serialized comparison
- private final boolean[] nullMask1;
- private final boolean[] nullMask2;
+ // bitmask for serialized comparison
+ // see serializer for more information about the bitmask encoding
+ private final boolean[] mask1;
+ private final boolean[] mask2;
// cache for the deserialized key field objects
transient private final Object[] deserializedKeyFields1;
@@ -144,8 +150,8 @@ public class RowComparator extends CompositeTypeComparator<Row> {
this.numLeadingNormalizableKeys = numLeadingNormalizableKeys;
this.normalizableKeyPrefixLen = normalizableKeyPrefixLen;
this.invertNormKey = invertNormKey;
- this.nullMask1 = new boolean[arity];
- this.nullMask2 = new boolean[arity];
+ this.mask1 = new boolean[ROW_KIND_OFFSET + arity];
+ this.mask2 = new boolean[ROW_KIND_OFFSET + arity];
deserializedKeyFields1 = instantiateDeserializationFields();
deserializedKeyFields2 = instantiateDeserializationFields();
}
@@ -251,43 +257,43 @@ public class RowComparator extends CompositeTypeComparator<Row> {
@Override
public int compareSerialized(
- DataInputView firstSource,
- DataInputView secondSource) throws IOException {
-
- int len = serializers.length;
- int keyLen = keyPositions.length;
+ DataInputView firstSource,
+ DataInputView secondSource) throws IOException {
+ final int len = serializers.length;
+ final int keyLen = keyPositions.length;
- readIntoNullMask(arity, firstSource, nullMask1);
- readIntoNullMask(arity, secondSource, nullMask2);
+ // read bitmask
+ readIntoMask(firstSource, mask1);
+ readIntoMask(secondSource, mask2);
- // deserialize
- for (int i = 0; i < len; i++) {
- TypeSerializer<Object> serializer = serializers[i];
+ // deserialize fields
+ for (int fieldPos = 0; fieldPos < len; fieldPos++) {
+ final TypeSerializer<Object> serializer = serializers[fieldPos];
// deserialize field 1
- if (!nullMask1[i]) {
- deserializedKeyFields1[i] = serializer.deserialize(
- deserializedKeyFields1[i],
+ if (!mask1[ROW_KIND_OFFSET + fieldPos]) {
+ deserializedKeyFields1[fieldPos] = serializer.deserialize(
+ deserializedKeyFields1[fieldPos],
firstSource);
}
// deserialize field 2
- if (!nullMask2[i]) {
- deserializedKeyFields2[i] = serializer.deserialize(
- deserializedKeyFields2[i],
+ if (!mask2[ROW_KIND_OFFSET + fieldPos]) {
+ deserializedKeyFields2[fieldPos] = serializer.deserialize(
+ deserializedKeyFields2[fieldPos],
secondSource);
}
}
// compare
- for (int i = 0; i < keyLen; i++) {
- int keyPos = keyPositions[i];
- TypeComparator<Object> comparator = comparators[i];
+ for (int fieldPos = 0; fieldPos < keyLen; fieldPos++) {
+ final int keyPos = keyPositions[fieldPos];
+ final TypeComparator<Object> comparator = comparators[fieldPos];
- boolean isNull1 = nullMask1[keyPos];
- boolean isNull2 = nullMask2[keyPos];
+ final boolean isNull1 = mask1[ROW_KIND_OFFSET + keyPos];
+ final boolean isNull2 = mask2[ROW_KIND_OFFSET + keyPos];
- int cmp = 0;
+ int cmp;
// both values are null -> equality
if (isNull1 && isNull2) {
cmp = 0;
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index 505ce7b..a492457 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -28,35 +28,61 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Arrays;
+import java.util.Objects;
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.writeNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoAndCopyMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.writeMask;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Serializer for {@link Row}.
+ *
+ * <p>It uses the following serialization format:
+ * <pre>
+ * |bitmask|field|field|....
+ * </pre>
+ * The bitmask serves as a header that consists of {@link #ROW_KIND_OFFSET} bits for encoding the
+ * {@link RowKind} and n bits for whether a field is null. For backwards compatibility, those bits
+ * can be ignored if serializer runs in legacy mode:
+ * <pre>
+ * bitmask with row kind: |RK RK F1 F2 ... FN|
+ * bitmask in legacy mode: |F1 F2 ... FN|
+ * </pre>
*/
@Internal
public final class RowSerializer extends TypeSerializer<Row> {
- private static final long serialVersionUID = 1L;
+ public static final int ROW_KIND_OFFSET = 2;
+
+ private static final long serialVersionUID = 2L;
+
+ private final boolean legacyModeEnabled;
+
+ private final int legacyOffset;
private final TypeSerializer<Object>[] fieldSerializers;
private final int arity;
- private transient boolean[] nullMask;
+ private transient boolean[] mask;
- @SuppressWarnings("unchecked")
public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
+ this(fieldSerializers, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ public RowSerializer(TypeSerializer<?>[] fieldSerializers, boolean legacyModeEnabled) {
+ this.legacyModeEnabled = legacyModeEnabled;
+ this.legacyOffset = legacyModeEnabled ? 0 : ROW_KIND_OFFSET;
this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
this.arity = fieldSerializers.length;
- this.nullMask = new boolean[fieldSerializers.length];
+ this.mask = new boolean[legacyOffset + fieldSerializers.length];
}
@Override
@@ -70,7 +96,7 @@ public final class RowSerializer extends TypeSerializer<Row> {
for (int i = 0; i < fieldSerializers.length; i++) {
duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
}
- return new RowSerializer(duplicateFieldSerializers);
+ return new RowSerializer(duplicateFieldSerializers, legacyModeEnabled);
}
@Override
@@ -86,7 +112,7 @@ public final class RowSerializer extends TypeSerializer<Row> {
throw new RuntimeException("Row arity of from does not match serializers.");
}
- Row result = new Row(len);
+ Row result = new Row(from.getKind(), len);
for (int i = 0; i < len; i++) {
Object fromField = from.getField(i);
if (fromField != null) {
@@ -114,6 +140,8 @@ public final class RowSerializer extends TypeSerializer<Row> {
"Row arity of reuse or from is incompatible with this RowSerializer.");
}
+ reuse.setKind(from.getKind());
+
for (int i = 0; i < len; i++) {
Object fromField = from.getField(i);
if (fromField != null) {
@@ -145,39 +173,42 @@ public final class RowSerializer extends TypeSerializer<Row> {
@Override
public void serialize(Row record, DataOutputView target) throws IOException {
- int len = fieldSerializers.length;
+ final int len = fieldSerializers.length;
if (record.getArity() != len) {
throw new RuntimeException("Row arity of from does not match serializers.");
}
- // write a null mask
- writeNullMask(len, record, target);
+ // write bitmask
+ fillMask(len, record, mask, legacyModeEnabled, legacyOffset);
+ writeMask(mask, target);
// serialize non-null fields
- for (int i = 0; i < len; i++) {
- Object o = record.getField(i);
+ for (int fieldPos = 0; fieldPos < len; fieldPos++) {
+ final Object o = record.getField(fieldPos);
if (o != null) {
- fieldSerializers[i].serialize(o, target);
+ fieldSerializers[fieldPos].serialize(o, target);
}
}
}
@Override
public Row deserialize(DataInputView source) throws IOException {
- int len = fieldSerializers.length;
-
- Row result = new Row(len);
-
- // read null mask
- readIntoNullMask(len, source, nullMask);
+ final int len = fieldSerializers.length;
+
+ // read bitmask
+ readIntoMask(source, mask);
+ final Row result;
+ if (legacyModeEnabled) {
+ result = new Row(len);
+ } else {
+ result = new Row(readKindFromMask(mask), len);
+ }
- for (int i = 0; i < len; i++) {
- if (nullMask[i]) {
- result.setField(i, null);
- }
- else {
- result.setField(i, fieldSerializers[i].deserialize(source));
+ // deserialize fields
+ for (int fieldPos = 0; fieldPos < len; fieldPos++) {
+ if (!mask[legacyOffset + fieldPos]) {
+ result.setField(fieldPos, fieldSerializers[fieldPos].deserialize(source));
}
}
@@ -186,26 +217,29 @@ public final class RowSerializer extends TypeSerializer<Row> {
@Override
public Row deserialize(Row reuse, DataInputView source) throws IOException {
- int len = fieldSerializers.length;
+ final int len = fieldSerializers.length;
if (reuse.getArity() != len) {
throw new RuntimeException("Row arity of from does not match serializers.");
}
- // read null mask
- readIntoNullMask(len, source, nullMask);
+ // read bitmask
+ readIntoMask(source, mask);
+ if (!legacyModeEnabled) {
+ reuse.setKind(readKindFromMask(mask));
+ }
- for (int i = 0; i < len; i++) {
- if (nullMask[i]) {
- reuse.setField(i, null);
- }
- else {
- Object reuseField = reuse.getField(i);
+ // deserialize fields
+ for (int fieldPos = 0; fieldPos < len; fieldPos++) {
+ if (mask[legacyOffset + fieldPos]) {
+ reuse.setField(fieldPos, null);
+ } else {
+ Object reuseField = reuse.getField(fieldPos);
if (reuseField != null) {
- reuse.setField(i, fieldSerializers[i].deserialize(reuseField, source));
+ reuse.setField(fieldPos, fieldSerializers[fieldPos].deserialize(reuseField, source));
}
else {
- reuse.setField(i, fieldSerializers[i].deserialize(source));
+ reuse.setField(fieldPos, fieldSerializers[fieldPos].deserialize(source));
}
}
}
@@ -217,43 +251,68 @@ public final class RowSerializer extends TypeSerializer<Row> {
public void copy(DataInputView source, DataOutputView target) throws IOException {
int len = fieldSerializers.length;
- // copy null mask
- readIntoAndCopyNullMask(len, source, target, nullMask);
+ // copy bitmask
+ readIntoAndCopyMask(source, target, mask);
- for (int i = 0; i < len; i++) {
- if (!nullMask[i]) {
- fieldSerializers[i].copy(source, target);
+ // copy non-null fields
+ for (int fieldPos = 0; fieldPos < len; fieldPos++) {
+ if (!mask[legacyOffset + fieldPos]) {
+ fieldSerializers[fieldPos].copy(source, target);
}
}
}
@Override
- public boolean equals(Object obj) {
- if (obj instanceof RowSerializer) {
- RowSerializer other = (RowSerializer) obj;
- if (this.fieldSerializers.length == other.fieldSerializers.length) {
- for (int i = 0; i < this.fieldSerializers.length; i++) {
- if (!this.fieldSerializers[i].equals(other.fieldSerializers[i])) {
- return false;
- }
- }
- return true;
- }
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
}
-
- return false;
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RowSerializer that = (RowSerializer) o;
+ return legacyModeEnabled == that.legacyModeEnabled &&
+ Arrays.equals(fieldSerializers, that.fieldSerializers);
}
@Override
public int hashCode() {
- return Arrays.hashCode(fieldSerializers);
+ int result = Objects.hash(legacyModeEnabled);
+ result = 31 * result + Arrays.hashCode(fieldSerializers);
+ return result;
}
// --------------------------------------------------------------------------------------------
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
- this.nullMask = new boolean[fieldSerializers.length];
+ this.mask = new boolean[legacyOffset + fieldSerializers.length];
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Serialization utilities
+ // --------------------------------------------------------------------------------------------
+
+ private static void fillMask(
+ int fieldLength,
+ Row row,
+ boolean[] mask,
+ boolean legacyModeEnabled,
+ int legacyOffset) {
+ if (!legacyModeEnabled) {
+ final byte kind = row.getKind().toByteValue();
+ mask[0] = (kind & 0x01) > 0;
+ mask[1] = (kind & 0x02) > 0;
+ }
+
+ for (int fieldPos = 0; fieldPos < fieldLength; fieldPos++) {
+ mask[legacyOffset + fieldPos] = row.getField(fieldPos) == null;
+ }
+ }
+
+ private static RowKind readKindFromMask(boolean[] mask) {
+ final byte kind = (byte) ((mask[0] ? 0x01 : 0x00) + (mask[1] ? 0x02 : 0x00));
+ return RowKind.fromByteValue(kind);
}
// --------------------------------------------------------------------------------------------
@@ -282,7 +341,7 @@ public final class RowSerializer extends TypeSerializer<Row> {
public RowSerializerConfigSnapshot() {
}
- public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) {
+ public RowSerializerConfigSnapshot(TypeSerializer<?>[] fieldSerializers) {
super(fieldSerializers);
}
@@ -308,11 +367,15 @@ public final class RowSerializer extends TypeSerializer<Row> {
/**
* A {@link TypeSerializerSnapshot} for RowSerializer.
*/
+ // TODO not fully functional yet due to FLINK-17520
public static final class RowSerializerSnapshot extends CompositeTypeSerializerSnapshot<Row, RowSerializer> {
- private static final int VERSION = 2;
+ private static final int VERSION = 3;
+
+ private static final int VERSION_WITHOUT_ROW_KIND = 2;
+
+ private boolean legacyModeEnabled = false;
- @SuppressWarnings("WeakerAccess")
public RowSerializerSnapshot() {
super(RowSerializer.class);
}
@@ -327,13 +390,23 @@ public final class RowSerializer extends TypeSerializer<Row> {
}
@Override
+ protected void readOuterSnapshot(
+ int readOuterSnapshotVersion,
+ DataInputView in,
+ ClassLoader userCodeClassLoader) {
+ if (readOuterSnapshotVersion == VERSION_WITHOUT_ROW_KIND) {
+ legacyModeEnabled = true;
+ }
+ }
+
+ @Override
protected TypeSerializer<?>[] getNestedSerializers(RowSerializer outerSerializer) {
return outerSerializer.fieldSerializers;
}
@Override
protected RowSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
- return new RowSerializer(nestedSerializers);
+ return new RowSerializer(nestedSerializers, legacyModeEnabled);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java
index aa15bf9..29ec058 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Row.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Row.java
@@ -18,68 +18,118 @@
package org.apache.flink.types;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.Arrays;
/**
- * A Row can have arbitrary number of fields and contain a set of fields, which may all be
- * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink's
- * type extraction mechanism can't extract correct field types. So that users should manually
- * tell Flink the type information via creating a {@link RowTypeInfo}.
+ * A row is a fixed-length, null-aware composite type for storing multiple values in a deterministic
+ * field order. Every field can be null regardless of the field's type. The type of row fields cannot
+ * be automatically inferred; therefore, it is required to provide type information whenever a row is
+ * produced.
+ *
+ * <p>The main purpose of rows is to bridge between Flink's Table and SQL ecosystem and other APIs. Therefore,
+ * a row does not only consist of a schema part (containing the fields) but also attaches a {@link RowKind}
+ * for encoding a change in a changelog. Thus, a row can be considered as an entry in a changelog. For example,
+ * in regular batch scenarios, a changelog would consist of a bounded stream of {@link RowKind#INSERT} rows.
*
- * <p>
- * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can
- * set fields by {@link #setField(int, Object)}.
- * <p>
- * Row is in principle serializable. However, it may contain non-serializable fields,
- * in which case serialization will fail.
+ * <p>The fields of a row can be accessed by position (zero-based) using {@link #getField(int)} and
+ * {@link #setField(int, Object)}. The row kind is kept separate from the fields and can be accessed
+ * by using {@link #getKind()} and {@link #setKind(RowKind)}.
*
+ * <p>A row instance is in principle {@link Serializable}. However, it may contain non-serializable fields
+ * in which case serialization will fail if the row is not serialized with Flink's serialization stack.
*/
@PublicEvolving
-public class Row implements Serializable{
+public final class Row implements Serializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+
+ /** The kind of change a row describes in a changelog. */
+ private RowKind kind;
/** The array to store actual values. */
private final Object[] fields;
/**
- * Create a new Row instance.
- * @param arity The number of fields in the Row
+ * Create a new row instance.
+ *
+ * <p>By default, a row describes an {@link RowKind#INSERT} change.
+ *
+ * @param kind kind of change a row describes in a changelog
+ * @param arity The number of fields in the row.
*/
- public Row(int arity) {
+ public Row(RowKind kind, int arity) {
+ this.kind = Preconditions.checkNotNull(kind, "Row kind must not be null.");
this.fields = new Object[arity];
}
/**
- * Get the number of fields in the Row.
- * @return The number of fields in the Row.
+ * Create a new row instance.
+ *
+ * <p>By default, a row describes an {@link RowKind#INSERT} change.
+ *
+ * @param arity The number of fields in the row.
+ */
+ public Row(int arity) {
+ this(RowKind.INSERT, arity);
+ }
+
+ /**
+ * Returns the kind of change that this row describes in a changelog.
+ *
+ * <p>By default, a row describes an {@link RowKind#INSERT} change.
+ *
+ * @see RowKind
+ */
+ public RowKind getKind() {
+ return kind;
+ }
+
+ /**
+ * Sets the kind of change that this row describes in a changelog.
+ *
+ * <p>By default, a row describes an {@link RowKind#INSERT} change.
+ *
+ * @see RowKind
+ */
+ public void setKind(RowKind kind) {
+ Preconditions.checkNotNull(kind, "Row kind must not be null.");
+ this.kind = kind;
+ }
+
+ /**
+ * Returns the number of fields in the row.
+ *
+ * <p>Note: The row kind is kept separate from the fields and is not included in this number.
+ *
+ * @return The number of fields in the row.
*/
public int getArity() {
return fields.length;
}
/**
- * Gets the field at the specified position.
+ * Returns the field's content at the specified position.
+ *
* @param pos The position of the field, 0-based.
- * @return The field at the specified position.
- * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
+ * @return The field's content at the specified position.
*/
- public Object getField(int pos) {
+ public @Nullable Object getField(int pos) {
return fields[pos];
}
/**
- * Sets the field at the specified position.
+ * Sets the field's content at the specified position.
*
* @param pos The position of the field, 0-based.
* @param value The value to be assigned to the field at the specified position.
- * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
*/
- public void setField(int pos, Object value) {
+ public void setField(int pos, @Nullable Object value) {
fields[pos] = value;
}
@@ -103,25 +153,29 @@ public class Row implements Serializable{
if (o == null || getClass() != o.getClass()) {
return false;
}
-
Row row = (Row) o;
-
- return Arrays.deepEquals(fields, row.fields);
+ return kind == row.kind &&
+ Arrays.deepEquals(fields, row.fields);
}
@Override
public int hashCode() {
- return Arrays.deepHashCode(fields);
+ int result = kind.toByteValue(); // for stable hash across JVM instances
+ result = 31 * result + Arrays.deepHashCode(fields);
+ return result;
}
+ // --------------------------------------------------------------------------------------------
+ // Utility methods
+ // --------------------------------------------------------------------------------------------
+
/**
- * Creates a new Row and assigns the given values to the Row's fields.
+ * Creates a new row and assigns the given values to the row's fields.
* This is more convenient than using the constructor.
*
* <p>For example:
- *
* <pre>
- * Row.of("hello", true, 1L);}
+ * Row.of("hello", true, 1L);
* </pre>
* instead of
* <pre>
@@ -131,6 +185,7 @@ public class Row implements Serializable{
* row.setField(2, 1L);
* </pre>
*
+ * <p>By default, a row describes an {@link RowKind#INSERT} change.
*/
public static Row of(Object... values) {
Row row = new Row(values.length);
@@ -141,27 +196,50 @@ public class Row implements Serializable{
}
/**
- * Creates a new Row which copied from another row.
- * This method does not perform a deep copy.
+ * Creates a new row with given kind and assigns the given values to the row's fields.
+ * This is more convenient than using the constructor.
+ *
+ * <p>For example:
+ * <pre>
+ * Row.ofKind(RowKind.INSERT, "hello", true, 1L);
+ * </pre>
+ * instead of
+ * <pre>
+ * Row row = new Row(3);
+ * row.setKind(RowKind.INSERT);
+ * row.setField(0, "hello");
+ * row.setField(1, true);
+ * row.setField(2, 1L);
+ * </pre>
+ */
+ public static Row ofKind(RowKind kind, Object... values) {
+ Row row = new Row(kind, values.length);
+ for (int i = 0; i < values.length; i++) {
+ row.setField(i, values[i]);
+ }
+ return row;
+ }
+
+ /**
+ * Creates a new row which is copied from another row (including its {@link RowKind}).
*
- * @param row The row being copied.
- * @return The cloned new Row
+ * <p>This method does not perform a deep copy.
*/
public static Row copy(Row row) {
- final Row newRow = new Row(row.fields.length);
+ final Row newRow = new Row(row.kind, row.fields.length);
System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);
return newRow;
}
/**
- * Creates a new Row with projected fields from another row.
- * This method does not perform a deep copy.
+ * Creates a new row with projected fields and identical {@link RowKind} from another row.
+ *
+ * <p>This method does not perform a deep copy.
*
- * @param fields fields to be projected
- * @return the new projected Row
+ * @param fields field indices to be projected
*/
public static Row project(Row row, int[] fields) {
- final Row newRow = new Row(fields.length);
+ final Row newRow = new Row(row.kind, fields.length);
for (int i = 0; i < fields.length; i++) {
newRow.fields[i] = row.fields[fields[i]];
}
@@ -169,12 +247,11 @@ public class Row implements Serializable{
}
/**
- * Creates a new Row which fields are copied from the other rows.
- * This method does not perform a deep copy.
+ * Creates a new row with fields that are copied from the other rows and appended to the resulting
+ * row in the given order. The {@link RowKind} of the first row determines the {@link RowKind} of
+ * the result.
*
- * @param first The first row being copied.
- * @param remainings The other rows being copied.
- * @return the joined new Row
+ * <p>This method does not perform a deep copy.
*/
public static Row join(Row first, Row... remainings) {
int newLength = first.fields.length;
@@ -182,7 +259,7 @@ public class Row implements Serializable{
newLength += remaining.fields.length;
}
- final Row joinedRow = new Row(newLength);
+ final Row joinedRow = new Row(first.kind, newLength);
int index = 0;
// copy the first row
diff --git a/flink-core/src/main/java/org/apache/flink/types/RowKind.java b/flink-core/src/main/java/org/apache/flink/types/RowKind.java
index a1acf7d..eddc1b2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/RowKind.java
+++ b/flink-core/src/main/java/org/apache/flink/types/RowKind.java
@@ -26,6 +26,9 @@ import org.apache.flink.annotation.PublicEvolving;
@PublicEvolving
public enum RowKind {
+ // Note: Enums have no stable hash code across different JVMs, use toByteValue() for
+ // this purpose.
+
/**
* Insertion operation.
*/
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/LegacyRowSerializerTest.java
similarity index 89%
copy from flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
copy to flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/LegacyRowSerializerTest.java
index f949431..5c78b85 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/LegacyRowSerializerTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.SerializerTestInstance;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -31,12 +30,16 @@ import org.apache.flink.types.Row;
import org.junit.Test;
import java.io.Serializable;
+import java.util.Objects;
-public class RowSerializerTest {
+/**
+ * Tests for the old serialization format of {@link Row} before Flink 1.11.
+ */
+public class LegacyRowSerializerTest {
@Test
public void testRowSerializer() {
- TypeInformation<Row> typeInfo = new RowTypeInfo(
+ RowTypeInfo typeInfo = new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
Row row1 = new Row(2);
@@ -47,14 +50,14 @@ public class RowSerializerTest {
row2.setField(0, 2);
row2.setField(1, null);
- TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+ TypeSerializer<Row> serializer = typeInfo.createLegacySerializer(new ExecutionConfig());
RowSerializerTestInstance instance = new RowSerializerTestInstance(serializer, row1, row2);
instance.testAll();
}
@Test
public void testLargeRowSerializer() {
- TypeInformation<Row> typeInfo = new RowTypeInfo(
+ RowTypeInfo typeInfo = new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
@@ -83,14 +86,14 @@ public class RowSerializerTest {
row.setField(11, null);
row.setField(12, "Test");
- TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+ TypeSerializer<Row> serializer = typeInfo.createLegacySerializer(new ExecutionConfig());
RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, row);
testInstance.testAll();
}
@Test
public void testRowSerializerWithComplexTypes() {
- TypeInformation<Row> typeInfo = new RowTypeInfo(
+ RowTypeInfo typeInfo = new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
@@ -124,7 +127,7 @@ public class RowSerializerTest {
createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
};
- TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+ TypeSerializer<Row> serializer = typeInfo.createLegacySerializer(new ExecutionConfig());
RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, data);
testInstance.testAll();
}
@@ -141,7 +144,6 @@ public class RowSerializerTest {
return row;
}
-
private class RowSerializerTestInstance extends SerializerTestInstance<Row> {
RowSerializerTestInstance(
@@ -175,11 +177,8 @@ public class RowSerializerTest {
if (o == null || getClass() != o.getClass()) {
return false;
}
-
- MyPojo myPojo = (MyPojo) o;
-
- return name != null ? name.equals(myPojo.name) : myPojo.name == null;
+ final MyPojo myPojo = (MyPojo) o;
+ return Objects.equals(name, myPojo.name);
}
-
}
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
index ca54bd4..471c692 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
import org.junit.BeforeClass;
import java.io.Serializable;
@@ -51,20 +53,20 @@ public class RowComparatorTest extends ComparatorTestBase<Row> {
private static MyPojo testPojo3 = new MyPojo();
private static final Row[] data = new Row[]{
- createRow(null, null, null, null, null),
- createRow(0, null, null, null, null),
- createRow(0, 0.0, null, null, null),
- createRow(0, 0.0, "a", null, null),
- createRow(1, 0.0, "a", null, null),
- createRow(1, 1.0, "a", null, null),
- createRow(1, 1.0, "b", null, null),
- createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
- createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
- createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
- createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
- createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
- createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
- createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
+ createRow(RowKind.INSERT, null, null, null, null, null),
+ createRow(RowKind.INSERT, 0, null, null, null, null),
+ createRow(RowKind.INSERT, 0, 0.0, null, null, null),
+ createRow(RowKind.INSERT, 0, 0.0, "a", null, null),
+ createRow(RowKind.INSERT, 1, 0.0, "a", null, null),
+ createRow(RowKind.INSERT, 1, 1.0, "a", null, null),
+ createRow(RowKind.INSERT, 1, 1.0, "b", null, null),
+ createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
+ createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
+ createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
+ createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
+ createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
+ createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
+ createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
};
@BeforeClass
@@ -110,8 +112,8 @@ public class RowComparatorTest extends ComparatorTestBase<Row> {
return true;
}
- private static Row createRow(Object f0, Object f1, Object f2, Object f3, Object f4) {
- Row row = new Row(5);
+ private static Row createRow(RowKind kind, Object f0, Object f1, Object f2, Object f3, Object f4) {
+ Row row = new Row(kind, 5);
row.setField(0, f0);
row.setField(1, f1);
row.setField(2, f2);
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
index 7a7888a..1aacd4f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.runtime.RowSerializer.RowSerializerSn
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.types.Row;
+import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -35,6 +36,7 @@ import java.util.Collection;
/**
* State migration test for {@link RowSerializer}.
*/
+@Ignore
@RunWith(Parameterized.class)
public class RowSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Row> {
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
index f949431..d117974 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
@@ -27,10 +27,12 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.junit.Test;
import java.io.Serializable;
+import java.util.Objects;
public class RowSerializerTest {
@@ -40,10 +42,12 @@ public class RowSerializerTest {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
Row row1 = new Row(2);
+ row1.setKind(RowKind.UPDATE_BEFORE);
row1.setField(0, 1);
row1.setField(1, "a");
Row row2 = new Row(2);
+ row2.setKind(RowKind.INSERT);
row2.setField(0, 2);
row2.setField(1, null);
@@ -108,20 +112,20 @@ public class RowSerializerTest {
testPojo3.name = "Test2";
Row[] data = new Row[]{
- createRow(null, null, null, null, null),
- createRow(0, null, null, null, null),
- createRow(0, 0.0, null, null, null),
- createRow(0, 0.0, "a", null, null),
- createRow(1, 0.0, "a", null, null),
- createRow(1, 1.0, "a", null, null),
- createRow(1, 1.0, "b", null, null),
- createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
- createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
- createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
- createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
- createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
- createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
- createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
+ createRow(RowKind.INSERT, null, null, null, null, null),
+ createRow(RowKind.INSERT, 0, null, null, null, null),
+ createRow(RowKind.INSERT, 0, 0.0, null, null, null),
+ createRow(RowKind.INSERT, 0, 0.0, "a", null, null),
+ createRow(RowKind.INSERT, 1, 0.0, "a", null, null),
+ createRow(RowKind.INSERT, 1, 1.0, "a", null, null),
+ createRow(RowKind.INSERT, 1, 1.0, "b", null, null),
+ createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
+ createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
+ createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
+ createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
+ createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
+ createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
+ createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
};
TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
@@ -131,8 +135,8 @@ public class RowSerializerTest {
// ----------------------------------------------------------------------------------------------
- private static Row createRow(Object f0, Object f1, Object f2, Object f3, Object f4) {
- Row row = new Row(5);
+ private static Row createRow(RowKind kind, Object f0, Object f1, Object f2, Object f3, Object f4) {
+ Row row = new Row(kind, 5);
row.setField(0, f0);
row.setField(1, f1);
row.setField(2, f2);
@@ -175,10 +179,8 @@ public class RowSerializerTest {
if (o == null || getClass() != o.getClass()) {
return false;
}
-
- MyPojo myPojo = (MyPojo) o;
-
- return name != null ? name.equals(myPojo.name) : myPojo.name == null;
+ final MyPojo myPojo = (MyPojo) o;
+ return Objects.equals(name, myPojo.name);
}
}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
index c919eb7..1ebcc0a 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
@@ -292,7 +292,7 @@ public final class PythonTypeUtils {
.stream()
.map(f -> f.getType().accept(this))
.toArray(TypeSerializer[]::new);
- return new RowSerializer(fieldTypeSerializers);
+ return new RowSerializer(fieldTypeSerializers, true);
}
@Override
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
index 68d5250..1fe8115 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
@@ -36,7 +36,7 @@ import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
import java.util.Arrays;
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask;
/**
* A {@link TypeSerializer} for {@link RowData}. It should be noted that the header will not be encoded.
@@ -55,7 +55,7 @@ public class RowDataSerializer extends org.apache.flink.table.runtime.typeutils.
super(types, fieldSerializers);
this.fieldTypes = types;
this.fieldSerializers = fieldSerializers;
- this.nullMask = new boolean[fieldTypes.length];
+ this.nullMask = new boolean[fieldSerializers.length];
}
@Override
@@ -79,10 +79,8 @@ public class RowDataSerializer extends org.apache.flink.table.runtime.typeutils.
@Override
public RowData deserialize(DataInputView source) throws IOException {
- int len = fieldSerializers.length;
-
// read null mask
- readIntoNullMask(len, source, nullMask);
+ readIntoMask(source, nullMask);
GenericRowData row = new GenericRowData(fieldSerializers.length);
for (int i = 0; i < row.getArity(); i++) {
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
index 46f2ada..9b59650 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
@@ -52,7 +52,7 @@ public class RowArrowSourceFunctionTest extends ArrowSourceFunctionTestBase<Row>
public RowArrowSourceFunctionTest() {
super(VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator),
- new RowSerializer(new TypeSerializer[]{StringSerializer.INSTANCE}),
+ new RowSerializer(new TypeSerializer[]{StringSerializer.INSTANCE}, true),
Comparator.comparing(o -> (String) (o.getField(0))));
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java
index 411e65e..4687785 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java
@@ -54,6 +54,21 @@ public final class GenericRowData implements RowData {
private RowKind kind;
/**
+ * Creates an instance of {@link GenericRowData} with given kind and number of fields.
+ *
+ * <p>Initially, all fields are set to null.
+ *
+ * <p>Note: All fields of the row must be internal data structures.
+ *
+ * @param kind kind of change that this row describes in a changelog
+ * @param arity number of fields
+ */
+ public GenericRowData(RowKind kind, int arity) {
+ this.fields = new Object[arity];
+ this.kind = kind;
+ }
+
+ /**
* Creates an instance of {@link GenericRowData} with given number of fields.
*
* <p>Initially, all fields are set to null. By default, the row describes a {@link RowKind#INSERT}
@@ -244,4 +259,19 @@ public final class GenericRowData implements RowData {
return row;
}
+
+ /**
+ * Creates an instance of {@link GenericRowData} with given kind and field values.
+ *
+ * <p>Note: All fields of the row must be internal data structures.
+ */
+ public static GenericRowData ofKind(RowKind kind, Object... values) {
+ GenericRowData row = new GenericRowData(kind, values.length);
+
+ for (int i = 0; i < values.length; ++i) {
+ row.setField(i, values[i]);
+ }
+
+ return row;
+ }
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
index 468efcd..c899ff3 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
@@ -57,7 +57,7 @@ class RowRowConverter implements DataStructureConverter<RowData, Row> {
@Override
public RowData toInternal(Row external) {
final int length = fieldConverters.length;
- final GenericRowData genericRow = new GenericRowData(length);
+ final GenericRowData genericRow = new GenericRowData(external.getKind(), length);
for (int pos = 0; pos < length; pos++) {
final Object value = external.getField(pos);
genericRow.setField(pos, fieldConverters[pos].toInternalOrNull(value));
@@ -68,7 +68,7 @@ class RowRowConverter implements DataStructureConverter<RowData, Row> {
@Override
public Row toExternal(RowData internal) {
final int length = fieldConverters.length;
- final Row row = new Row(length);
+ final Row row = new Row(internal.getRowKind(), length);
for (int pos = 0; pos < length; pos++) {
final Object value = fieldGetters[pos].getFieldOrNull(internal);
row.setField(pos, fieldConverters[pos].toExternalOrNull(value));
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
index 1f1089c..347a517 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
@@ -1411,7 +1411,7 @@ public class DataFormatConverters {
@Override
RowData toInternalImpl(Row value) {
- GenericRowData genericRow = new GenericRowData(converters.length);
+ GenericRowData genericRow = new GenericRowData(value.getKind(), converters.length);
for (int i = 0; i < converters.length; i++) {
genericRow.setField(i, converters[i].toInternal(value.getField(i)));
}
@@ -1420,7 +1420,7 @@ public class DataFormatConverters {
@Override
Row toExternalImpl(RowData value) {
- Row row = new Row(converters.length);
+ Row row = new Row(value.getRowKind(), converters.length);
for (int i = 0; i < converters.length; i++) {
row.setField(i, converters[i].toExternal(value, i));
}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
index 4ca0f1c..82e33d1 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.junit.Assert;
import org.junit.Test;
@@ -169,7 +170,7 @@ public class DataFormatConvertersTest {
test(simpleTypes[i], simpleValues[i]);
}
test(new RowTypeInfo(simpleTypes), new Row(simpleTypes.length));
- test(new RowTypeInfo(simpleTypes), Row.of(simpleValues));
+ test(new RowTypeInfo(simpleTypes), Row.ofKind(RowKind.DELETE, simpleValues));
test(new RowDataTypeInfo(new VarCharType(VarCharType.MAX_LENGTH), new IntType()),
GenericRowData.of(StringData.fromString("hehe"), 111));
test(new RowDataTypeInfo(new VarCharType(VarCharType.MAX_LENGTH), new IntType()), GenericRowData.of(null, null));
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index 8af45a4..a74517c 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeFactoryMock;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.InstantiationUtil;
import org.junit.Rule;
@@ -235,8 +236,8 @@ public class DataStructureConvertersTest {
ROW(
FIELD("b_1", DOUBLE()),
FIELD("b_2", BOOLEAN())))))
- .convertedTo(Row.class, Row.of(12, Row.of(2.0, null)))
- .convertedTo(RowData.class, GenericRowData.of(12, GenericRowData.of(2.0, null))),
+ .convertedTo(Row.class, Row.ofKind(RowKind.DELETE, 12, Row.of(2.0, null)))
+ .convertedTo(RowData.class, GenericRowData.ofKind(RowKind.DELETE, 12, GenericRowData.of(2.0, null))),
TestSpec
.forDataType(