You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/18 19:09:04 UTC

[flink] branch master updated: [FLINK-16998][core] Add a changeflag to Row

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f356799  [FLINK-16998][core] Add a changeflag to Row
f356799 is described below

commit f35679966eac9e3bb53a02bcdbd36dbd1341d405
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon May 4 20:17:38 2020 +0200

    [FLINK-16998][core] Add a changeflag to Row
    
    Partial commit for supporting a changeflag without backwards compatibility.
    
    This closes #12103.
---
 .../apache/flink/api/common/typeinfo/Types.java    |   2 +-
 .../flink/api/java/typeutils/RowTypeInfo.java      |  15 ++
 .../runtime/{NullMaskUtils.java => MaskUtils.java} |  37 ++--
 .../api/java/typeutils/runtime/RowComparator.java  |  66 +++----
 .../api/java/typeutils/runtime/RowSerializer.java  | 197 ++++++++++++++-------
 .../src/main/java/org/apache/flink/types/Row.java  | 173 +++++++++++++-----
 .../main/java/org/apache/flink/types/RowKind.java  |   3 +
 ...lizerTest.java => LegacyRowSerializerTest.java} |  27 ++-
 .../java/typeutils/runtime/RowComparatorTest.java  |  34 ++--
 .../runtime/RowSerializerMigrationTest.java        |   2 +
 .../java/typeutils/runtime/RowSerializerTest.java  |  42 ++---
 .../table/runtime/typeutils/PythonTypeUtils.java   |   2 +-
 .../serializers/python/RowDataSerializer.java      |   8 +-
 .../arrow/sources/RowArrowSourceFunctionTest.java  |   2 +-
 .../apache/flink/table/data/GenericRowData.java    |  30 ++++
 .../table/data/conversion/RowRowConverter.java     |   4 +-
 .../table/data/util/DataFormatConverters.java      |   4 +-
 .../flink/table/data/DataFormatConvertersTest.java |   3 +-
 .../table/data/DataStructureConvertersTest.java    |   5 +-
 19 files changed, 435 insertions(+), 221 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
index ede4396..34e0bef 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java
@@ -183,7 +183,7 @@ public class Types {
 	 * <p>A row is a fixed-length, null-aware composite type for storing multiple values in a
 	 * deterministic field order. Every field can be null regardless of the field's type.
 	 * The type of row fields cannot be automatically inferred; therefore, it is required to provide
-	 * type information whenever a row is used.
+	 * type information whenever a row is produced.
 	 *
 	 * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> fields, however, all row instances
 	 * must strictly adhere to the schema defined by the type info.
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
index aec070c..e99e0bc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java
@@ -287,6 +287,21 @@ public class RowTypeInfo extends TupleTypeInfoBase<Row> {
 	}
 
 	/**
+	 * Creates a serializer for the old {@link Row} format before Flink 1.11.
+	 *
+	 * <p>The serialization format has changed from 1.10 to 1.11 and added {@link Row#getKind()}.
+	 */
+	@Deprecated
+	public TypeSerializer<Row> createLegacySerializer(ExecutionConfig config) {
+		int len = getArity();
+		TypeSerializer<?>[] fieldSerializers = new TypeSerializer[len];
+		for (int i = 0; i < len; i++) {
+			fieldSerializers[i] = types[i].createSerializer(config);
+		}
+		return new RowSerializer(fieldSerializers, true);
+	}
+
+	/**
 	 * Returns the field types of the row. The order matches the order of the field names.
 	 */
 	public TypeInformation<?>[] getFieldTypes() {
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
similarity index 73%
rename from flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
rename to flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
index cfe562f..ea2830e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullMaskUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/MaskUtils.java
@@ -20,14 +20,19 @@ package org.apache.flink.api.java.typeutils.runtime;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Row;
 
 import java.io.IOException;
 
+/**
+ * Utilities for reading and writing binary masks.
+ */
 @Internal
-public class NullMaskUtils {
+public final class MaskUtils {
+
+	@SuppressWarnings("UnusedAssignment")
+	public static void writeMask(boolean[] mask, DataOutputView target) throws IOException {
+		final int len = mask.length;
 
-	public static void writeNullMask(int len, Row value, DataOutputView target) throws IOException {
 		int b = 0x00;
 		int bytePos = 0;
 
@@ -40,8 +45,8 @@ public class NullMaskUtils {
 			numPos = Math.min(8, len - fieldPos);
 			while (bytePos < numPos) {
 				b = b << 1;
-				// set bit if field is null
-				if (value.getField(fieldPos + bytePos) == null) {
+				// set bit if element is true
+				if (mask[fieldPos + bytePos]) {
 					b |= 0x01;
 				}
 				bytePos += 1;
@@ -54,10 +59,9 @@ public class NullMaskUtils {
 		}
 	}
 
-	public static void readIntoNullMask(
-		int len,
-		DataInputView source,
-		boolean[] nullMask) throws IOException {
+	@SuppressWarnings("UnusedAssignment")
+	public static void readIntoMask(DataInputView source, boolean[] mask) throws IOException {
+		final int len = mask.length;
 
 		int b = 0x00;
 		int bytePos = 0;
@@ -70,7 +74,7 @@ public class NullMaskUtils {
 			bytePos = 0;
 			numPos = Math.min(8, len - fieldPos);
 			while (bytePos < numPos) {
-				nullMask[fieldPos + bytePos] = (b & 0x80) > 0;
+				mask[fieldPos + bytePos] = (b & 0x80) > 0;
 				b = b << 1;
 				bytePos += 1;
 			}
@@ -78,11 +82,12 @@ public class NullMaskUtils {
 		}
 	}
 
-	public static void readIntoAndCopyNullMask(
-		int len,
-		DataInputView source,
-		DataOutputView target,
-		boolean[] nullMask) throws IOException {
+	@SuppressWarnings("UnusedAssignment")
+	public static void readIntoAndCopyMask(
+			DataInputView source,
+			DataOutputView target,
+			boolean[] mask) throws IOException {
+		final int len = mask.length;
 
 		int b = 0x00;
 		int bytePos = 0;
@@ -97,7 +102,7 @@ public class NullMaskUtils {
 			bytePos = 0;
 			numPos = Math.min(8, len - fieldPos);
 			while (bytePos < numPos) {
-				nullMask[fieldPos + bytePos] = (b & 0x80) > 0;
+				mask[fieldPos + bytePos] = (b & 0x80) > 0;
 				b = b << 1;
 				bytePos += 1;
 			}
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
index 135623b..3f80100 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java
@@ -32,16 +32,21 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask;
+import static org.apache.flink.api.java.typeutils.runtime.RowSerializer.ROW_KIND_OFFSET;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * Comparator for {@link Row}
+ * Comparator for {@link Row}.
+ *
+ * <p>Note: Since comparators are used only in DataSet API for batch use cases, this comparator assumes the
+ * latest serialization format and ignores {@link Row#getKind()} for simplicity of the implementation
+ * and efficiency.
  */
 @Internal
 public class RowComparator extends CompositeTypeComparator<Row> {
 
-	private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 2L;
 	/** The number of fields of the Row */
 	private final int arity;
 	/** key positions describe which fields are keys in what order */
@@ -56,9 +61,10 @@ public class RowComparator extends CompositeTypeComparator<Row> {
 	private final int normalizableKeyPrefixLen;
 	private final boolean invertNormKey;
 
-	// null masks for serialized comparison
-	private final boolean[] nullMask1;
-	private final boolean[] nullMask2;
+	// bitmask for serialized comparison
+	// see serializer for more information about the bitmask encoding
+	private final boolean[] mask1;
+	private final boolean[] mask2;
 
 	// cache for the deserialized key field objects
 	transient private final Object[] deserializedKeyFields1;
@@ -144,8 +150,8 @@ public class RowComparator extends CompositeTypeComparator<Row> {
 		this.numLeadingNormalizableKeys = numLeadingNormalizableKeys;
 		this.normalizableKeyPrefixLen = normalizableKeyPrefixLen;
 		this.invertNormKey = invertNormKey;
-		this.nullMask1 = new boolean[arity];
-		this.nullMask2 = new boolean[arity];
+		this.mask1 = new boolean[ROW_KIND_OFFSET + arity];
+		this.mask2 = new boolean[ROW_KIND_OFFSET + arity];
 		deserializedKeyFields1 = instantiateDeserializationFields();
 		deserializedKeyFields2 = instantiateDeserializationFields();
 	}
@@ -251,43 +257,43 @@ public class RowComparator extends CompositeTypeComparator<Row> {
 
 	@Override
 	public int compareSerialized(
-		DataInputView firstSource,
-		DataInputView secondSource) throws IOException {
-
-		int len = serializers.length;
-		int keyLen = keyPositions.length;
+			DataInputView firstSource,
+			DataInputView secondSource) throws IOException {
+		final int len = serializers.length;
+		final int keyLen = keyPositions.length;
 
-		readIntoNullMask(arity, firstSource, nullMask1);
-		readIntoNullMask(arity, secondSource, nullMask2);
+		// read bitmask
+		readIntoMask(firstSource, mask1);
+		readIntoMask(secondSource, mask2);
 
-		// deserialize
-		for (int i = 0; i < len; i++) {
-			TypeSerializer<Object> serializer = serializers[i];
+		// deserialize fields
+		for (int fieldPos = 0; fieldPos < len; fieldPos++) {
+			final TypeSerializer<Object> serializer = serializers[fieldPos];
 
 			// deserialize field 1
-			if (!nullMask1[i]) {
-				deserializedKeyFields1[i] = serializer.deserialize(
-					deserializedKeyFields1[i],
+			if (!mask1[ROW_KIND_OFFSET + fieldPos]) {
+				deserializedKeyFields1[fieldPos] = serializer.deserialize(
+					deserializedKeyFields1[fieldPos],
 					firstSource);
 			}
 
 			// deserialize field 2
-			if (!nullMask2[i]) {
-				deserializedKeyFields2[i] = serializer.deserialize(
-					deserializedKeyFields2[i],
+			if (!mask2[ROW_KIND_OFFSET + fieldPos]) {
+				deserializedKeyFields2[fieldPos] = serializer.deserialize(
+					deserializedKeyFields2[fieldPos],
 					secondSource);
 			}
 		}
 
 		// compare
-		for (int i = 0; i < keyLen; i++) {
-			int keyPos = keyPositions[i];
-			TypeComparator<Object> comparator = comparators[i];
+		for (int fieldPos = 0; fieldPos < keyLen; fieldPos++) {
+			final int keyPos = keyPositions[fieldPos];
+			final TypeComparator<Object> comparator = comparators[fieldPos];
 
-			boolean isNull1 = nullMask1[keyPos];
-			boolean isNull2 = nullMask2[keyPos];
+			final boolean isNull1 = mask1[ROW_KIND_OFFSET + keyPos];
+			final boolean isNull2 = mask2[ROW_KIND_OFFSET + keyPos];
 
-			int cmp = 0;
+			int cmp;
 			// both values are null -> equality
 			if (isNull1 && isNull2) {
 				cmp = 0;
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index 505ce7b..a492457 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -28,35 +28,61 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.Arrays;
+import java.util.Objects;
 
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.writeNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoAndCopyMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.writeMask;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Serializer for {@link Row}.
+ *
+ * <p>It uses the following serialization format:
+ * <pre>
+ *     |bitmask|field|field|....
+ * </pre>
+ * The bitmask serves as a header that consists of {@link #ROW_KIND_OFFSET} bits for encoding the
+ * {@link RowKind} and n bits for whether a field is null. For backwards compatibility, those bits
+ * can be ignored if serializer runs in legacy mode:
+ * <pre>
+ *     bitmask with row kind:  |RK RK F1 F2 ... FN|
+ *     bitmask in legacy mode: |F1 F2 ... FN|
+ * </pre>
  */
 @Internal
 public final class RowSerializer extends TypeSerializer<Row> {
 
-	private static final long serialVersionUID = 1L;
+	public static final int ROW_KIND_OFFSET = 2;
+
+	private static final long serialVersionUID = 2L;
+
+	private final boolean legacyModeEnabled;
+
+	private final int legacyOffset;
 
 	private final TypeSerializer<Object>[] fieldSerializers;
 
 	private final int arity;
 
-	private transient boolean[] nullMask;
+	private transient boolean[] mask;
 
-	@SuppressWarnings("unchecked")
 	public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
+		this(fieldSerializers, false);
+	}
+
+	@SuppressWarnings("unchecked")
+	public RowSerializer(TypeSerializer<?>[] fieldSerializers, boolean legacyModeEnabled) {
+		this.legacyModeEnabled = legacyModeEnabled;
+		this.legacyOffset = legacyModeEnabled ? 0 : ROW_KIND_OFFSET;
 		this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
 		this.arity = fieldSerializers.length;
-		this.nullMask = new boolean[fieldSerializers.length];
+		this.mask = new boolean[legacyOffset + fieldSerializers.length];
 	}
 
 	@Override
@@ -70,7 +96,7 @@ public final class RowSerializer extends TypeSerializer<Row> {
 		for (int i = 0; i < fieldSerializers.length; i++) {
 			duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
 		}
-		return new RowSerializer(duplicateFieldSerializers);
+		return new RowSerializer(duplicateFieldSerializers, legacyModeEnabled);
 	}
 
 	@Override
@@ -86,7 +112,7 @@ public final class RowSerializer extends TypeSerializer<Row> {
 			throw new RuntimeException("Row arity of from does not match serializers.");
 		}
 
-		Row result = new Row(len);
+		Row result = new Row(from.getKind(), len);
 		for (int i = 0; i < len; i++) {
 			Object fromField = from.getField(i);
 			if (fromField != null) {
@@ -114,6 +140,8 @@ public final class RowSerializer extends TypeSerializer<Row> {
 				"Row arity of reuse or from is incompatible with this RowSerializer.");
 		}
 
+		reuse.setKind(from.getKind());
+
 		for (int i = 0; i < len; i++) {
 			Object fromField = from.getField(i);
 			if (fromField != null) {
@@ -145,39 +173,42 @@ public final class RowSerializer extends TypeSerializer<Row> {
 
 	@Override
 	public void serialize(Row record, DataOutputView target) throws IOException {
-		int len = fieldSerializers.length;
+		final int len = fieldSerializers.length;
 
 		if (record.getArity() != len) {
 			throw new RuntimeException("Row arity of from does not match serializers.");
 		}
 
-		// write a null mask
-		writeNullMask(len, record, target);
+		// write bitmask
+		fillMask(len, record, mask, legacyModeEnabled, legacyOffset);
+		writeMask(mask, target);
 
 		// serialize non-null fields
-		for (int i = 0; i < len; i++) {
-			Object o = record.getField(i);
+		for (int fieldPos = 0; fieldPos < len; fieldPos++) {
+			final Object o = record.getField(fieldPos);
 			if (o != null) {
-				fieldSerializers[i].serialize(o, target);
+				fieldSerializers[fieldPos].serialize(o, target);
 			}
 		}
 	}
 
 	@Override
 	public Row deserialize(DataInputView source) throws IOException {
-		int len = fieldSerializers.length;
-
-		Row result = new Row(len);
-
-		// read null mask
-		readIntoNullMask(len, source, nullMask);
+		final int len = fieldSerializers.length;
+
+		// read bitmask
+		readIntoMask(source, mask);
+		final Row result;
+		if (legacyModeEnabled) {
+			result = new Row(len);
+		} else {
+			result = new Row(readKindFromMask(mask), len);
+		}
 
-		for (int i = 0; i < len; i++) {
-			if (nullMask[i]) {
-				result.setField(i, null);
-			}
-			else {
-				result.setField(i, fieldSerializers[i].deserialize(source));
+		// deserialize fields
+		for (int fieldPos = 0; fieldPos < len; fieldPos++) {
+			if (!mask[legacyOffset + fieldPos]) {
+				result.setField(fieldPos, fieldSerializers[fieldPos].deserialize(source));
 			}
 		}
 
@@ -186,26 +217,29 @@ public final class RowSerializer extends TypeSerializer<Row> {
 
 	@Override
 	public Row deserialize(Row reuse, DataInputView source) throws IOException {
-		int len = fieldSerializers.length;
+		final int len = fieldSerializers.length;
 
 		if (reuse.getArity() != len) {
 			throw new RuntimeException("Row arity of from does not match serializers.");
 		}
 
-		// read null mask
-		readIntoNullMask(len, source, nullMask);
+		// read bitmask
+		readIntoMask(source, mask);
+		if (!legacyModeEnabled) {
+			reuse.setKind(readKindFromMask(mask));
+		}
 
-		for (int i = 0; i < len; i++) {
-			if (nullMask[i]) {
-				reuse.setField(i, null);
-			}
-			else {
-				Object reuseField = reuse.getField(i);
+		// deserialize fields
+		for (int fieldPos = 0; fieldPos < len; fieldPos++) {
+			if (mask[legacyOffset + fieldPos]) {
+				reuse.setField(fieldPos, null);
+			} else {
+				Object reuseField = reuse.getField(fieldPos);
 				if (reuseField != null) {
-					reuse.setField(i, fieldSerializers[i].deserialize(reuseField, source));
+					reuse.setField(fieldPos, fieldSerializers[fieldPos].deserialize(reuseField, source));
 				}
 				else {
-					reuse.setField(i, fieldSerializers[i].deserialize(source));
+					reuse.setField(fieldPos, fieldSerializers[fieldPos].deserialize(source));
 				}
 			}
 		}
@@ -217,43 +251,68 @@ public final class RowSerializer extends TypeSerializer<Row> {
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		int len = fieldSerializers.length;
 
-		// copy null mask
-		readIntoAndCopyNullMask(len, source, target, nullMask);
+		// copy bitmask
+		readIntoAndCopyMask(source, target, mask);
 
-		for (int i = 0; i < len; i++) {
-			if (!nullMask[i]) {
-				fieldSerializers[i].copy(source, target);
+		// copy non-null fields
+		for (int fieldPos = 0; fieldPos < len; fieldPos++) {
+			if (!mask[legacyOffset + fieldPos]) {
+				fieldSerializers[fieldPos].copy(source, target);
 			}
 		}
 	}
 
 	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof RowSerializer) {
-			RowSerializer other = (RowSerializer) obj;
-			if (this.fieldSerializers.length == other.fieldSerializers.length) {
-				for (int i = 0; i < this.fieldSerializers.length; i++) {
-					if (!this.fieldSerializers[i].equals(other.fieldSerializers[i])) {
-						return false;
-					}
-				}
-				return true;
-			}
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
 		}
-
-		return false;
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		RowSerializer that = (RowSerializer) o;
+		return legacyModeEnabled == that.legacyModeEnabled &&
+			Arrays.equals(fieldSerializers, that.fieldSerializers);
 	}
 
 	@Override
 	public int hashCode() {
-		return Arrays.hashCode(fieldSerializers);
+		int result = Objects.hash(legacyModeEnabled);
+		result = 31 * result + Arrays.hashCode(fieldSerializers);
+		return result;
 	}
 
 	// --------------------------------------------------------------------------------------------
 
 	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
 		in.defaultReadObject();
-		this.nullMask = new boolean[fieldSerializers.length];
+		this.mask = new boolean[legacyOffset + fieldSerializers.length];
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Serialization utilities
+	// --------------------------------------------------------------------------------------------
+
+	private static void fillMask(
+			int fieldLength,
+			Row row,
+			boolean[] mask,
+			boolean legacyModeEnabled,
+			int legacyOffset) {
+		if (!legacyModeEnabled) {
+			final byte kind = row.getKind().toByteValue();
+			mask[0] = (kind & 0x01) > 0;
+			mask[1] = (kind & 0x02) > 0;
+		}
+
+		for (int fieldPos = 0; fieldPos < fieldLength; fieldPos++) {
+			mask[legacyOffset + fieldPos] = row.getField(fieldPos) == null;
+		}
+	}
+
+	private static RowKind readKindFromMask(boolean[] mask) {
+		final byte kind = (byte) ((mask[0] ? 0x01 : 0x00) + (mask[1] ? 0x02 : 0x00));
+		return RowKind.fromByteValue(kind);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -282,7 +341,7 @@ public final class RowSerializer extends TypeSerializer<Row> {
 		public RowSerializerConfigSnapshot() {
 		}
 
-		public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) {
+		public RowSerializerConfigSnapshot(TypeSerializer<?>[] fieldSerializers) {
 			super(fieldSerializers);
 		}
 
@@ -308,11 +367,15 @@ public final class RowSerializer extends TypeSerializer<Row> {
 	/**
 	 * A {@link TypeSerializerSnapshot} for RowSerializer.
 	 */
+	// TODO not fully functional yet due to FLINK-17520
 	public static final class RowSerializerSnapshot extends CompositeTypeSerializerSnapshot<Row, RowSerializer> {
 
-		private static final int VERSION = 2;
+		private static final int VERSION = 3;
+
+		private static final int VERSION_WITHOUT_ROW_KIND = 2;
+
+		private boolean legacyModeEnabled = false;
 
-		@SuppressWarnings("WeakerAccess")
 		public RowSerializerSnapshot() {
 			super(RowSerializer.class);
 		}
@@ -327,13 +390,23 @@ public final class RowSerializer extends TypeSerializer<Row> {
 		}
 
 		@Override
+		protected void readOuterSnapshot(
+				int readOuterSnapshotVersion,
+				DataInputView in,
+				ClassLoader userCodeClassLoader) {
+			if (readOuterSnapshotVersion == VERSION_WITHOUT_ROW_KIND) {
+				legacyModeEnabled = true;
+			}
+		}
+
+		@Override
 		protected TypeSerializer<?>[] getNestedSerializers(RowSerializer outerSerializer) {
 			return outerSerializer.fieldSerializers;
 		}
 
 		@Override
 		protected RowSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
-			return new RowSerializer(nestedSerializers);
+			return new RowSerializer(nestedSerializers, legacyModeEnabled);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java
index aa15bf9..29ec058 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Row.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Row.java
@@ -18,68 +18,118 @@
 package org.apache.flink.types;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Arrays;
 
 /**
- * A Row can have arbitrary number of fields and contain a set of fields, which may all be
- * different types. The fields in Row can be null. Due to Row is not strongly typed, Flink's
- * type extraction mechanism can't extract correct field types. So that users should manually
- * tell Flink the type information via creating a {@link RowTypeInfo}.
+ * A row is a fixed-length, null-aware composite type for storing multiple values in a deterministic
+ * field order. Every field can be null regardless of the field's type. The type of row fields cannot
+ * be automatically inferred; therefore, it is required to provide type information whenever a row is
+ * produced.
+ *
+ * <p>The main purpose of rows is to bridge between Flink's Table and SQL ecosystem and other APIs. Therefore,
+ * a row does not only consist of a schema part (containing the fields) but also attaches a {@link RowKind}
+ * for encoding a change in a changelog. Thus, a row can be considered as an entry in a changelog. For example,
+ * in regular batch scenarios, a changelog would consist of a bounded stream of {@link RowKind#INSERT} rows.
  *
- * <p>
- * The fields in the Row can be accessed by position (zero-based) {@link #getField(int)}. And can
- * set fields by {@link #setField(int, Object)}.
- * <p>
- * Row is in principle serializable. However, it may contain non-serializable fields,
- * in which case serialization will fail.
+ * <p>The fields of a row can be accessed by position (zero-based) using {@link #getField(int)} and
+ * {@link #setField(int, Object)}. The row kind is kept separate from the fields and can be accessed
+ * by using {@link #getKind()} and {@link #setKind(RowKind)}.
  *
+ * <p>A row instance is in principle {@link Serializable}. However, it may contain non-serializable fields
+ * in which case serialization will fail if the row is not serialized with Flink's serialization stack.
  */
 @PublicEvolving
-public class Row implements Serializable{
+public final class Row implements Serializable {
 
-	private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 2L;
+
+	/** The kind of change a row describes in a changelog. */
+	private RowKind kind;
 
 	/** The array to store actual values. */
 	private final Object[] fields;
 
 	/**
-	 * Create a new Row instance.
-	 * @param arity The number of fields in the Row
+	 * Create a new row instance.
+	 *
+	 * <p>By default, a row describes an {@link RowKind#INSERT} change.
+	 *
+	 * @param kind kind of change a row describes in a changelog
+	 * @param arity The number of fields in the row.
 	 */
-	public Row(int arity) {
+	public Row(RowKind kind, int arity) {
+		this.kind = Preconditions.checkNotNull(kind, "Row kind must not be null.");
 		this.fields = new Object[arity];
 	}
 
 	/**
-	 * Get the number of fields in the Row.
-	 * @return The number of fields in the Row.
+	 * Create a new row instance.
+	 *
+	 * <p>By default, a row describes an {@link RowKind#INSERT} change.
+	 *
+	 * @param arity The number of fields in the row.
+	 */
+	public Row(int arity) {
+		this(RowKind.INSERT, arity);
+	}
+
+	/**
+	 * Returns the kind of change that this row describes in a changelog.
+	 *
+	 * <p>By default, a row describes an {@link RowKind#INSERT} change.
+	 *
+	 * @see RowKind
+	 */
+	public RowKind getKind() {
+		return kind;
+	}
+
+	/**
+	 * Sets the kind of change that this row describes in a changelog.
+	 *
+	 * <p>By default, a row describes an {@link RowKind#INSERT} change.
+	 *
+	 * @see RowKind
+	 */
+	public void setKind(RowKind kind) {
+		Preconditions.checkNotNull(kind, "Row kind must not be null.");
+		this.kind = kind;
+	}
+
+	/**
+	 * Returns the number of fields in the row.
+	 *
+	 * <p>Note: The row kind is kept separate from the fields and is not included in this number.
+	 *
+	 * @return The number of fields in the row.
 	 */
 	public int getArity() {
 		return fields.length;
 	}
 
 	/**
-	 * Gets the field at the specified position.
+	 * Returns the field's content at the specified position.
+	 *
 	 * @param pos The position of the field, 0-based.
-	 * @return The field at the specified position.
-	 * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
+	 * @return The field's content at the specified position.
 	 */
-	public Object getField(int pos) {
+	public @Nullable Object getField(int pos) {
 		return fields[pos];
 	}
 
 	/**
-	 * Sets the field at the specified position.
+	 * Sets the field's content at the specified position.
 	 *
 	 * @param pos The position of the field, 0-based.
 	 * @param value The value to be assigned to the field at the specified position.
-	 * @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields.
 	 */
-	public void setField(int pos, Object value) {
+	public void setField(int pos, @Nullable Object value) {
 		fields[pos] = value;
 	}
 
@@ -103,25 +153,29 @@ public class Row implements Serializable{
 		if (o == null || getClass() != o.getClass()) {
 			return false;
 		}
-
 		Row row = (Row) o;
-
-		return Arrays.deepEquals(fields, row.fields);
+		return kind == row.kind &&
+			Arrays.deepEquals(fields, row.fields);
 	}
 
 	@Override
 	public int hashCode() {
-		return Arrays.deepHashCode(fields);
+		int result = kind.toByteValue(); // for stable hash across JVM instances
+		result = 31 * result + Arrays.deepHashCode(fields);
+		return result;
 	}
 
+	// --------------------------------------------------------------------------------------------
+	// Utility methods
+	// --------------------------------------------------------------------------------------------
+
 	/**
-	 * Creates a new Row and assigns the given values to the Row's fields.
+	 * Creates a new row and assigns the given values to the row's fields.
 	 * This is more convenient than using the constructor.
 	 *
 	 * <p>For example:
-	 *
 	 * <pre>
-	 *     Row.of("hello", true, 1L);}
+	 *     Row.of("hello", true, 1L);
 	 * </pre>
 	 * instead of
 	 * <pre>
@@ -131,6 +185,7 @@ public class Row implements Serializable{
 	 *     row.setField(2, 1L);
 	 * </pre>
 	 *
+	 * <p>By default, a row describes an {@link RowKind#INSERT} change.
 	 */
 	public static Row of(Object... values) {
 		Row row = new Row(values.length);
@@ -141,27 +196,50 @@ public class Row implements Serializable{
 	}
 
 	/**
-	 * Creates a new Row which copied from another row.
-	 * This method does not perform a deep copy.
+	 * Creates a new row with given kind and assigns the given values to the row's fields.
+	 * This is more convenient than using the constructor.
+	 *
+	 * <p>For example:
+	 * <pre>
+	 *     Row.ofKind(RowKind.INSERT, "hello", true, 1L);
+	 * </pre>
+	 * instead of
+	 * <pre>
+	 *     Row row = new Row(3);
+	 *     row.setKind(RowKind.INSERT);
+	 *     row.setField(0, "hello");
+	 *     row.setField(1, true);
+	 *     row.setField(2, 1L);
+	 * </pre>
+	 */
+	public static Row ofKind(RowKind kind, Object... values) {
+		Row row = new Row(kind, values.length);
+		for (int i = 0; i < values.length; i++) {
+			row.setField(i, values[i]);
+		}
+		return row;
+	}
+
+	/**
+	 * Creates a new row which is copied from another row (including its {@link RowKind}).
 	 *
-	 * @param row The row being copied.
-	 * @return The cloned new Row
+	 * <p>This method does not perform a deep copy.
 	 */
 	public static Row copy(Row row) {
-		final Row newRow = new Row(row.fields.length);
+		final Row newRow = new Row(row.kind, row.fields.length);
 		System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);
 		return newRow;
 	}
 
 	/**
-	 * Creates a new Row with projected fields from another row.
-	 * This method does not perform a deep copy.
+	 * Creates a new row with projected fields and identical {@link RowKind} from another row.
+	 *
+	 * <p>This method does not perform a deep copy.
 	 *
-	 * @param fields fields to be projected
-	 * @return the new projected Row
+	 * @param fields field indices to be projected
 	 */
 	public static Row project(Row row, int[] fields) {
-		final Row newRow = new Row(fields.length);
+		final Row newRow = new Row(row.kind, fields.length);
 		for (int i = 0; i < fields.length; i++) {
 			newRow.fields[i] = row.fields[fields[i]];
 		}
@@ -169,12 +247,11 @@ public class Row implements Serializable{
 	}
 
 	/**
-	 * Creates a new Row which fields are copied from the other rows.
-	 * This method does not perform a deep copy.
+	 * Creates a new row with fields that are copied from the other rows and appended to the resulting
+	 * row in the given order. The {@link RowKind} of the first row determines the {@link RowKind} of
+	 * the result.
 	 *
-	 * @param first The first row being copied.
-	 * @param remainings The other rows being copied.
-	 * @return the joined new Row
+	 * <p>This method does not perform a deep copy.
 	 */
 	public static Row join(Row first, Row... remainings) {
 		int newLength = first.fields.length;
@@ -182,7 +259,7 @@ public class Row implements Serializable{
 			newLength += remaining.fields.length;
 		}
 
-		final Row joinedRow = new Row(newLength);
+		final Row joinedRow = new Row(first.kind, newLength);
 		int index = 0;
 
 		// copy the first row
diff --git a/flink-core/src/main/java/org/apache/flink/types/RowKind.java b/flink-core/src/main/java/org/apache/flink/types/RowKind.java
index a1acf7d..eddc1b2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/RowKind.java
+++ b/flink-core/src/main/java/org/apache/flink/types/RowKind.java
@@ -26,6 +26,9 @@ import org.apache.flink.annotation.PublicEvolving;
 @PublicEvolving
 public enum RowKind {
 
+	// Note: Enums have no stable hash code across different JVMs, use toByteValue() for
+	// this purpose.
+
 	/**
 	 * Insertion operation.
 	 */
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/LegacyRowSerializerTest.java
similarity index 89%
copy from flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
copy to flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/LegacyRowSerializerTest.java
index f949431..5c78b85 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/LegacyRowSerializerTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -31,12 +30,16 @@ import org.apache.flink.types.Row;
 import org.junit.Test;
 
 import java.io.Serializable;
+import java.util.Objects;
 
-public class RowSerializerTest {
+/**
+ * Tests for the old serialization format of {@link Row} before Flink 1.11.
+ */
+public class LegacyRowSerializerTest {
 
 	@Test
 	public void testRowSerializer() {
-		TypeInformation<Row> typeInfo = new RowTypeInfo(
+		RowTypeInfo typeInfo = new RowTypeInfo(
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.STRING_TYPE_INFO);
 		Row row1 = new Row(2);
@@ -47,14 +50,14 @@ public class RowSerializerTest {
 		row2.setField(0, 2);
 		row2.setField(1, null);
 
-		TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+		TypeSerializer<Row> serializer = typeInfo.createLegacySerializer(new ExecutionConfig());
 		RowSerializerTestInstance instance = new RowSerializerTestInstance(serializer, row1, row2);
 		instance.testAll();
 	}
 
 	@Test
 	public void testLargeRowSerializer() {
-		TypeInformation<Row> typeInfo = new RowTypeInfo(
+		RowTypeInfo typeInfo = new RowTypeInfo(
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.INT_TYPE_INFO,
@@ -83,14 +86,14 @@ public class RowSerializerTest {
 		row.setField(11, null);
 		row.setField(12, "Test");
 
-		TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+		TypeSerializer<Row> serializer = typeInfo.createLegacySerializer(new ExecutionConfig());
 		RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, row);
 		testInstance.testAll();
 	}
 
 	@Test
 	public void testRowSerializerWithComplexTypes() {
-		TypeInformation<Row> typeInfo = new RowTypeInfo(
+		RowTypeInfo typeInfo = new RowTypeInfo(
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.DOUBLE_TYPE_INFO,
 			BasicTypeInfo.STRING_TYPE_INFO,
@@ -124,7 +127,7 @@ public class RowSerializerTest {
 			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
 		};
 
-		TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
+		TypeSerializer<Row> serializer = typeInfo.createLegacySerializer(new ExecutionConfig());
 		RowSerializerTestInstance testInstance = new RowSerializerTestInstance(serializer, data);
 		testInstance.testAll();
 	}
@@ -141,7 +144,6 @@ public class RowSerializerTest {
 		return row;
 	}
 
-
 	private class RowSerializerTestInstance extends SerializerTestInstance<Row> {
 
 		RowSerializerTestInstance(
@@ -175,11 +177,8 @@ public class RowSerializerTest {
 			if (o == null || getClass() != o.getClass()) {
 				return false;
 			}
-
-			MyPojo myPojo = (MyPojo) o;
-
-			return name != null ? name.equals(myPojo.name) : myPojo.name == null;
+			final MyPojo myPojo = (MyPojo) o;
+			return Objects.equals(name, myPojo.name);
 		}
-
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
index ca54bd4..471c692 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowComparatorTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
 import org.junit.BeforeClass;
 
 import java.io.Serializable;
@@ -51,20 +53,20 @@ public class RowComparatorTest extends ComparatorTestBase<Row> {
 	private static MyPojo testPojo3 = new MyPojo();
 
 	private static final Row[] data = new Row[]{
-		createRow(null, null, null, null, null),
-		createRow(0, null, null, null, null),
-		createRow(0, 0.0, null, null, null),
-		createRow(0, 0.0, "a", null, null),
-		createRow(1, 0.0, "a", null, null),
-		createRow(1, 1.0, "a", null, null),
-		createRow(1, 1.0, "b", null, null),
-		createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
-		createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
-		createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
-		createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
-		createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
-		createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
-		createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
+		createRow(RowKind.INSERT, null, null, null, null, null),
+		createRow(RowKind.INSERT, 0, null, null, null, null),
+		createRow(RowKind.INSERT, 0, 0.0, null, null, null),
+		createRow(RowKind.INSERT, 0, 0.0, "a", null, null),
+		createRow(RowKind.INSERT, 1, 0.0, "a", null, null),
+		createRow(RowKind.INSERT, 1, 1.0, "a", null, null),
+		createRow(RowKind.INSERT, 1, 1.0, "b", null, null),
+		createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
+		createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
+		createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
+		createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
+		createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
+		createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
+		createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
 	};
 
 	@BeforeClass
@@ -110,8 +112,8 @@ public class RowComparatorTest extends ComparatorTestBase<Row> {
 		return true;
 	}
 
-	private static Row createRow(Object f0, Object f1, Object f2, Object f3, Object f4) {
-		Row row = new Row(5);
+	private static Row createRow(RowKind kind, Object f0, Object f1, Object f2, Object f3, Object f4) {
+		Row row = new Row(kind, 5);
 		row.setField(0, f0);
 		row.setField(1, f1);
 		row.setField(2, f2);
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
index 7a7888a..1aacd4f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.runtime.RowSerializer.RowSerializerSn
 import org.apache.flink.testutils.migration.MigrationVersion;
 import org.apache.flink.types.Row;
 
+import org.junit.Ignore;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -35,6 +36,7 @@ import java.util.Collection;
 /**
  * State migration test for {@link RowSerializer}.
  */
+@Ignore
 @RunWith(Parameterized.class)
 public class RowSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Row> {
 
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
index f949431..d117974 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerTest.java
@@ -27,10 +27,12 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import org.junit.Test;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 public class RowSerializerTest {
 
@@ -40,10 +42,12 @@ public class RowSerializerTest {
 			BasicTypeInfo.INT_TYPE_INFO,
 			BasicTypeInfo.STRING_TYPE_INFO);
 		Row row1 = new Row(2);
+		row1.setKind(RowKind.UPDATE_BEFORE);
 		row1.setField(0, 1);
 		row1.setField(1, "a");
 
 		Row row2 = new Row(2);
+		row2.setKind(RowKind.INSERT);
 		row2.setField(0, 2);
 		row2.setField(1, null);
 
@@ -108,20 +112,20 @@ public class RowSerializerTest {
 		testPojo3.name = "Test2";
 
 		Row[] data = new Row[]{
-			createRow(null, null, null, null, null),
-			createRow(0, null, null, null, null),
-			createRow(0, 0.0, null, null, null),
-			createRow(0, 0.0, "a", null, null),
-			createRow(1, 0.0, "a", null, null),
-			createRow(1, 1.0, "a", null, null),
-			createRow(1, 1.0, "b", null, null),
-			createRow(1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
-			createRow(1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
-			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
-			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
-			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
-			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
-			createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
+			createRow(RowKind.INSERT, null, null, null, null, null),
+			createRow(RowKind.INSERT, 0, null, null, null, null),
+			createRow(RowKind.INSERT, 0, 0.0, null, null, null),
+			createRow(RowKind.INSERT, 0, 0.0, "a", null, null),
+			createRow(RowKind.INSERT, 1, 0.0, "a", null, null),
+			createRow(RowKind.INSERT, 1, 1.0, "a", null, null),
+			createRow(RowKind.INSERT, 1, 1.0, "b", null, null),
+			createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(1, false, (short) 2), null),
+			createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, false, (short) 2), null),
+			createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, true, (short) 2), null),
+			createRow(RowKind.UPDATE_AFTER, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), null),
+			createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo1),
+			createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo2),
+			createRow(RowKind.DELETE, 1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3)
 		};
 
 		TypeSerializer<Row> serializer = typeInfo.createSerializer(new ExecutionConfig());
@@ -131,8 +135,8 @@ public class RowSerializerTest {
 
 	// ----------------------------------------------------------------------------------------------
 
-	private static Row createRow(Object f0, Object f1, Object f2, Object f3, Object f4) {
-		Row row = new Row(5);
+	private static Row createRow(RowKind kind, Object f0, Object f1, Object f2, Object f3, Object f4) {
+		Row row = new Row(kind, 5);
 		row.setField(0, f0);
 		row.setField(1, f1);
 		row.setField(2, f2);
@@ -175,10 +179,8 @@ public class RowSerializerTest {
 			if (o == null || getClass() != o.getClass()) {
 				return false;
 			}
-
-			MyPojo myPojo = (MyPojo) o;
-
-			return name != null ? name.equals(myPojo.name) : myPojo.name == null;
+			final MyPojo myPojo = (MyPojo) o;
+			return Objects.equals(name, myPojo.name);
 		}
 
 	}
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
index c919eb7..1ebcc0a 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java
@@ -292,7 +292,7 @@ public final class PythonTypeUtils {
 				.stream()
 				.map(f -> f.getType().accept(this))
 				.toArray(TypeSerializer[]::new);
-			return new RowSerializer(fieldTypeSerializers);
+			return new RowSerializer(fieldTypeSerializers, true);
 		}
 
 		@Override
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
index 68d5250..1fe8115 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java
@@ -36,7 +36,7 @@ import org.apache.flink.util.InstantiationUtil;
 import java.io.IOException;
 import java.util.Arrays;
 
-import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
+import static org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask;
 
 /**
  * A {@link TypeSerializer} for {@link RowData}. It should be noted that the header will not be encoded.
@@ -55,7 +55,7 @@ public class RowDataSerializer extends org.apache.flink.table.runtime.typeutils.
 		super(types, fieldSerializers);
 		this.fieldTypes = types;
 		this.fieldSerializers = fieldSerializers;
-		this.nullMask = new boolean[fieldTypes.length];
+		this.nullMask = new boolean[fieldSerializers.length];
 	}
 
 	@Override
@@ -79,10 +79,8 @@ public class RowDataSerializer extends org.apache.flink.table.runtime.typeutils.
 
 	@Override
 	public RowData deserialize(DataInputView source) throws IOException {
-		int len = fieldSerializers.length;
-
 		// read null mask
-		readIntoNullMask(len, source, nullMask);
+		readIntoMask(source, nullMask);
 
 		GenericRowData row = new GenericRowData(fieldSerializers.length);
 		for (int i = 0; i < row.getArity(); i++) {
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
index 46f2ada..9b59650 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/RowArrowSourceFunctionTest.java
@@ -52,7 +52,7 @@ public class RowArrowSourceFunctionTest extends ArrowSourceFunctionTestBase<Row>
 
 	public RowArrowSourceFunctionTest() {
 		super(VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator),
-			new RowSerializer(new TypeSerializer[]{StringSerializer.INSTANCE}),
+			new RowSerializer(new TypeSerializer[]{StringSerializer.INSTANCE}, true),
 			Comparator.comparing(o -> (String) (o.getField(0))));
 	}
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java
index 411e65e..4687785 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java
@@ -54,6 +54,21 @@ public final class GenericRowData implements RowData {
 	private RowKind kind;
 
 	/**
+	 * Creates an instance of {@link GenericRowData} with given kind and number of fields.
+	 *
+	 * <p>Initially, all fields are set to null.
+	 *
+	 * <p>Note: All fields of the row must be internal data structures.
+	 *
+	 * @param kind kind of change that this row describes in a changelog
+	 * @param arity number of fields
+	 */
+	public GenericRowData(RowKind kind, int arity) {
+		this.fields = new Object[arity];
+		this.kind = kind;
+	}
+
+	/**
 	 * Creates an instance of {@link GenericRowData} with given number of fields.
 	 *
 	 * <p>Initially, all fields are set to null. By default, the row describes a {@link RowKind#INSERT}
@@ -244,4 +259,19 @@ public final class GenericRowData implements RowData {
 
 		return row;
 	}
+
+	/**
+	 * Creates an instance of {@link GenericRowData} with given kind and field values.
+	 *
+	 * <p>Note: All fields of the row must be internal data structures.
+	 */
+	public static GenericRowData ofKind(RowKind kind, Object... values) {
+		GenericRowData row = new GenericRowData(kind, values.length);
+
+		for (int i = 0; i < values.length; ++i) {
+			row.setField(i, values[i]);
+		}
+
+		return row;
+	}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
index 468efcd..c899ff3 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/conversion/RowRowConverter.java
@@ -57,7 +57,7 @@ class RowRowConverter implements DataStructureConverter<RowData, Row> {
 	@Override
 	public RowData toInternal(Row external) {
 		final int length = fieldConverters.length;
-		final GenericRowData genericRow = new GenericRowData(length);
+		final GenericRowData genericRow = new GenericRowData(external.getKind(), length);
 		for (int pos = 0; pos < length; pos++) {
 			final Object value = external.getField(pos);
 			genericRow.setField(pos, fieldConverters[pos].toInternalOrNull(value));
@@ -68,7 +68,7 @@ class RowRowConverter implements DataStructureConverter<RowData, Row> {
 	@Override
 	public Row toExternal(RowData internal) {
 		final int length = fieldConverters.length;
-		final Row row = new Row(length);
+		final Row row = new Row(internal.getRowKind(), length);
 		for (int pos = 0; pos < length; pos++) {
 			final Object value = fieldGetters[pos].getFieldOrNull(internal);
 			row.setField(pos, fieldConverters[pos].toExternalOrNull(value));
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
index 1f1089c..347a517 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
@@ -1411,7 +1411,7 @@ public class DataFormatConverters {
 
 		@Override
 		RowData toInternalImpl(Row value) {
-			GenericRowData genericRow = new GenericRowData(converters.length);
+			GenericRowData genericRow = new GenericRowData(value.getKind(), converters.length);
 			for (int i = 0; i < converters.length; i++) {
 				genericRow.setField(i, converters[i].toInternal(value.getField(i)));
 			}
@@ -1420,7 +1420,7 @@ public class DataFormatConverters {
 
 		@Override
 		Row toExternalImpl(RowData value) {
-			Row row = new Row(converters.length);
+			Row row = new Row(value.getRowKind(), converters.length);
 			for (int i = 0; i < converters.length; i++) {
 				row.setField(i, converters[i].toExternal(value, i));
 			}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
index 4ca0f1c..82e33d1 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -169,7 +170,7 @@ public class DataFormatConvertersTest {
 			test(simpleTypes[i], simpleValues[i]);
 		}
 		test(new RowTypeInfo(simpleTypes), new Row(simpleTypes.length));
-		test(new RowTypeInfo(simpleTypes), Row.of(simpleValues));
+		test(new RowTypeInfo(simpleTypes), Row.ofKind(RowKind.DELETE, simpleValues));
 		test(new RowDataTypeInfo(new VarCharType(VarCharType.MAX_LENGTH), new IntType()),
 				GenericRowData.of(StringData.fromString("hehe"), 111));
 		test(new RowDataTypeInfo(new VarCharType(VarCharType.MAX_LENGTH), new IntType()), GenericRowData.of(null, null));
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
index 8af45a4..a74517c 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeFactoryMock;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.Rule;
@@ -235,8 +236,8 @@ public class DataStructureConvertersTest {
 							ROW(
 								FIELD("b_1", DOUBLE()),
 								FIELD("b_2", BOOLEAN())))))
-				.convertedTo(Row.class, Row.of(12, Row.of(2.0, null)))
-				.convertedTo(RowData.class, GenericRowData.of(12, GenericRowData.of(2.0, null))),
+				.convertedTo(Row.class, Row.ofKind(RowKind.DELETE, 12, Row.of(2.0, null)))
+				.convertedTo(RowData.class, GenericRowData.ofKind(RowKind.DELETE, 12, GenericRowData.of(2.0, null))),
 
 			TestSpec
 				.forDataType(