You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:11 UTC
[flink] 06/18: [FLINK-11329] [core] Migrating the
NullableSerializer to use new compatibility API
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b22e91358fd2e58fc064968953a73218e87cfc88
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 15:27:50 2019 +0100
[FLINK-11329] [core] Migrating the NullableSerializer to use new compatibility API
---
.../java/typeutils/runtime/NullableSerializer.java | 179 +++++++++++++++------
.../TypeSerializerSnapshotMigrationTestBase.java | 63 +++++++-
.../runtime/NullableSerializerMigrationTest.java | 83 ++++++++++
.../flink-1.6-nullable-not-padded-serializer-data | Bin 0 -> 58 bytes
...ink-1.6-nullable-not-padded-serializer-snapshot | Bin 0 -> 944 bytes
.../flink-1.6-nullable-padded-serializer-data | Bin 0 -> 90 bytes
.../flink-1.6-nullable-padded-serializer-snapshot | Bin 0 -> 952 bytes
.../flink-1.7-nullable-not-padded-serializer-data | Bin 0 -> 58 bytes
...ink-1.7-nullable-not-padded-serializer-snapshot | Bin 0 -> 941 bytes
.../flink-1.7-nullable-padded-serializer-data | Bin 0 -> 90 bytes
.../flink-1.7-nullable-padded-serializer-snapshot | Bin 0 -> 949 bytes
11 files changed, 270 insertions(+), 55 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
index fe392e4..05941fa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
@@ -19,24 +19,23 @@
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import java.io.IOException;
-import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* Serializer wrapper to add support of {@code null} value serialization.
@@ -65,12 +64,15 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
private final byte[] padding;
private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
- this.originalSerializer = originalSerializer;
- this.padding = createPadding(originalSerializer.getLength(), padNullValueIfFixedLen);
+ this(originalSerializer, createPadding(originalSerializer.getLength(), padNullValueIfFixedLen));
+ }
+ private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, byte[] padding) {
+ this.originalSerializer = originalSerializer;
+ this.padding = padding;
}
- private static <T> byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) {
+ private static byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) {
boolean padNullValue = originalSerializerLength > 0 && padNullValueIfFixedLen;
return padNullValue ? new byte[originalSerializerLength] : EMPTY_BYTE_ARRAY;
}
@@ -79,7 +81,7 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
* This method tries to serialize {@code null} value with the {@code originalSerializer}
* and wraps it in case of {@link NullPointerException}, otherwise it returns the {@code originalSerializer}.
*
- * @param originalSerializer serializer to wrap and add {@code null} support
+ * @param originalSerializer serializer to wrap and add {@code null} support
* @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
* @return serializer which supports {@code null} values
*/
@@ -99,22 +101,24 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
DataOutputSerializer dos = new DataOutputSerializer(length);
try {
serializer.serialize(null, dos);
- } catch (IOException | RuntimeException e) {
+ }
+ catch (IOException | RuntimeException e) {
return false;
}
- Preconditions.checkArgument(
+ checkArgument(
serializer.getLength() < 0 || serializer.getLength() == dos.getCopyOfBuffer().length,
"The serialized form of the null value should have the same length " +
"as any other if the length is fixed in the serializer");
DataInputDeserializer dis = new DataInputDeserializer(dos.getSharedBuffer());
try {
- Preconditions.checkArgument(serializer.deserialize(dis) == null);
- } catch (IOException e) {
+ checkArgument(serializer.deserialize(dis) == null);
+ }
+ catch (IOException e) {
throw new RuntimeException(
String.format("Unexpected failure to deserialize just serialized null value with %s",
serializer.getClass().getName()), e);
}
- Preconditions.checkArgument(
+ checkArgument(
serializer.copy(null) == null,
"Serializer %s has to be able properly copy null value if it can serialize it",
serializer.getClass().getName());
@@ -125,10 +129,18 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
return padding.length > 0;
}
+ private int nullPaddingLength() {
+ return padding.length;
+ }
+
+ private TypeSerializer<T> originalSerializer() {
+ return originalSerializer;
+ }
+
/**
* This method wraps the {@code originalSerializer} with the {@code NullableSerializer} if not already wrapped.
*
- * @param originalSerializer serializer to wrap and add {@code null} support
+ * @param originalSerializer serializer to wrap and add {@code null} support
* @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
* @return wrapped serializer which supports {@code null} values
*/
@@ -176,7 +188,8 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
if (record == null) {
target.writeBoolean(true);
target.write(padding);
- } else {
+ }
+ else {
target.writeBoolean(false);
originalSerializer.serialize(record, target);
}
@@ -209,7 +222,8 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
target.writeBoolean(isNull);
if (isNull) {
target.write(padding);
- } else {
+ }
+ else {
originalSerializer.copy(source, target);
}
}
@@ -233,45 +247,29 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
}
@Override
- public NullableSerializerConfigSnapshot<T> snapshotConfiguration() {
- return new NullableSerializerConfigSnapshot<>(originalSerializer);
+ public TypeSerializerSnapshot<T> snapshotConfiguration() {
+ return new NullableSerializerSnapshot<>(this);
}
- @Override
- public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof NullableSerializerConfigSnapshot) {
- List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
- ((NullableSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
-
- CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
- previousKvSerializersAndConfigs.get(0).f0,
- UnloadableDummyTypeSerializer.class,
- previousKvSerializersAndConfigs.get(0).f1,
- originalSerializer);
-
- if (!compatResult.isRequiresMigration()) {
- return CompatibilityResult.compatible();
- } else if (compatResult.getConvertDeserializer() != null) {
- return CompatibilityResult.requiresMigration(
- new NullableSerializer<>(
- new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), padNullValue()));
- }
- }
-
- return CompatibilityResult.requiresMigration();
- }
/**
* Configuration snapshot for serializers of nullable types, containing the
* configuration snapshot of its original serializer.
+ *
+ * @deprecated this snapshot class is no longer in use, and is maintained only for
+ * backwards compatibility purposes. It is fully replaced
+ * by {@link NullableSerializerSnapshot}.
*/
+ @Deprecated
@Internal
- public static class NullableSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+ public static class NullableSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<T> {
private static final int VERSION = 1;
- /** This empty nullary constructor is required for deserializing the configuration. */
- @SuppressWarnings("unused")
- public NullableSerializerConfigSnapshot() {}
+ /**
+ * This empty nullary constructor is required for deserializing the configuration.
+ */
+ public NullableSerializerConfigSnapshot() {
+ }
NullableSerializerConfigSnapshot(TypeSerializer<T> originalSerializer) {
super(originalSerializer);
@@ -281,5 +279,88 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
public int getVersion() {
return VERSION;
}
+
+ @Override
+ public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
+ NullableSerializer<T> previousSerializer = (NullableSerializer<T>) restoreSerializer();
+ NullableSerializerSnapshot<T> newCompositeSnapshot = new NullableSerializerSnapshot<>(previousSerializer.nullPaddingLength());
+
+ return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ newCompositeSnapshot,
+ getSingleNestedSerializerAndConfig().f1
+ );
+ }
}
+
+ /**
+ * Snapshot for serializers of nullable types, containing the
+ * snapshot of its original serializer.
+ */
+ @SuppressWarnings({"unchecked", "WeakerAccess"})
+ public static class NullableSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<T, NullableSerializer<T>> {
+
+ private static final int VERSION = 2;
+ private int nullPaddingLength;
+
+ @SuppressWarnings("unused")
+ public NullableSerializerSnapshot() {
+ super(serializerClass());
+ }
+
+ public NullableSerializerSnapshot(NullableSerializer<T> serializerInstance) {
+ super(serializerInstance);
+ this.nullPaddingLength = serializerInstance.nullPaddingLength();
+ }
+
+ private NullableSerializerSnapshot(int nullPaddingLength) {
+ super(serializerClass());
+ checkArgument(nullPaddingLength >= 0,
+ "Computed NULL padding can not be negative. %d",
+ nullPaddingLength);
+
+ this.nullPaddingLength = nullPaddingLength;
+ }
+
+ @Override
+ protected int getCurrentOuterSnapshotVersion() {
+ return VERSION;
+ }
+
+ @Override
+ protected TypeSerializer<?>[] getNestedSerializers(NullableSerializer<T> outerSerializer) {
+ return new TypeSerializer[]{outerSerializer.originalSerializer()};
+ }
+
+ @Override
+ protected NullableSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ checkState(nullPaddingLength >= 0,
+ "Negative padding size after serializer construction: %d",
+ nullPaddingLength);
+
+ final byte[] padding = (nullPaddingLength == 0) ? EMPTY_BYTE_ARRAY : new byte[nullPaddingLength];
+ TypeSerializer<T> nestedSerializer = (TypeSerializer<T>) nestedSerializers[0];
+ return new NullableSerializer<>(nestedSerializer, padding);
+ }
+
+ @Override
+ protected void writeOuterSnapshot(DataOutputView out) throws IOException {
+ out.writeInt(nullPaddingLength);
+ }
+
+ @Override
+ protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+ nullPaddingLength = in.readInt();
+ }
+
+ @Override
+ protected boolean isOuterSnapshotCompatible(NullableSerializer<T> newSerializer) {
+ return nullPaddingLength == newSerializer.nullPaddingLength();
+ }
+
+ private static <T> Class<NullableSerializer<T>> serializerClass() {
+ return (Class<NullableSerializer<T>>) (Class<?>) NullableSerializer.class;
+ }
+ }
+
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index 57c939d..2e363fb 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.TestLogger;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
-
import org.junit.Test;
import java.io.IOException;
@@ -89,9 +88,11 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
TypeSerializer<ElementT> serializer = snapshot.restoreSerializer();
DataInputView input = dataUnderTest();
+
+ final Matcher<ElementT> matcher = testSpecification.testDataElementMatcher;
for (int i = 0; i < testSpecification.testDataCount; i++) {
final ElementT result = serializer.deserialize(input);
- assertThat(result, notNullValue());
+ assertThat(result, matcher);
}
}
@@ -193,6 +194,10 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
// Test Specification
// --------------------------------------------------------------------------------------------------------------
+ /**
+ * Test Specification.
+ */
+ @SuppressWarnings("WeakerAccess")
protected static final class TestSpecification<T> {
private final Class<? extends TypeSerializer<T>> serializerType;
private final Class<? extends TypeSerializerSnapshot<T>> snapshotClass;
@@ -205,6 +210,9 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
private int testDataCount;
@SuppressWarnings("unchecked")
+ private Matcher<T> testDataElementMatcher = (Matcher<T>) notNullValue();
+
+ @SuppressWarnings("unchecked")
public static <T> TestSpecification<T> builder(
String name,
Class<? extends TypeSerializer> serializerClass,
@@ -253,6 +261,11 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
return this;
}
+ public TestSpecification<T> withTestDataMatcher(Matcher<T> matcher) {
+ testDataElementMatcher = matcher;
+ return this;
+ }
+
private TypeSerializer<T> createSerializer() {
try {
return (serializerProvider == null) ? serializerType.newInstance() : serializerProvider.get();
@@ -274,10 +287,6 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
return testMigrationVersion;
}
- public Class<? extends TypeSerializerSnapshot<T>> getSnapshotClass() {
- return snapshotClass;
- }
-
@Override
public String toString() {
return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName());
@@ -344,6 +353,45 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
/**
* Adds a test specification to be tested for all specified test versions.
*
+ * <p>This method adds the specification with pre-defined snapshot and data filenames,
+ * with the format "flink-<testVersion>-<specName>-<data/snapshot>",
+ * and each specification's test data count is assumed to always be 10.
+ *
+ * @param name test specification name.
+ * @param serializerClass class of the current serializer.
+ * @param snapshotClass class of the current serializer snapshot class.
+ * @param serializerProvider provider for an instance of the current serializer.
+ * @param elementMatcher an {@code hamcrest} matcher to match test data.
+ *
+ * @param <T> type of the test data.
+ */
+ public <T> void add(
+ String name,
+ Class<? extends TypeSerializer> serializerClass,
+ Class<? extends TypeSerializerSnapshot> snapshotClass,
+ Supplier<? extends TypeSerializer<T>> serializerProvider,
+ Matcher<T> elementMatcher) {
+ for (MigrationVersion testVersion : testVersions) {
+ testSpecifications.add(
+ TestSpecification.<T>builder(
+ getSpecNameForVersion(name, testVersion),
+ serializerClass,
+ snapshotClass,
+ testVersion)
+ .withNewSerializerProvider(serializerProvider)
+ .withSnapshotDataLocation(
+ String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name))
+ .withTestData(
+ String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name),
+ DEFAULT_TEST_DATA_COUNT)
+ .withTestDataMatcher(elementMatcher)
+ );
+ }
+ }
+
+ /**
+ * Adds a test specification to be tested for all specified test versions.
+ *
* @param name test specification name.
* @param serializerClass class of the current serializer.
* @param snapshotClass class of the current serializer snapshot class.
@@ -385,6 +433,9 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
}
}
+ /**
+ * Supplier of paths based on {@link MigrationVersion}.
+ */
protected interface TestResourceFilenameSupplier {
String get(MigrationVersion testVersion);
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java
new file mode 100644
index 0000000..1da7da7
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer.NullableSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * {@link NullableSerializer} migration test.
+ */
+@RunWith(Parameterized.class)
+public class NullableSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Long> {
+
+ public NullableSerializerMigrationTest(TestSpecification<Long> testSpecification) {
+ super(testSpecification);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Parameterized.Parameters(name = "Test Specification = {0}")
+ public static Collection<TestSpecification<?>> testSpecifications() {
+
+ final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+ testSpecifications.add(
+ "nullable-padded-serializer",
+ NullableSerializer.class,
+ NullableSerializerSnapshot.class,
+ () -> NullableSerializer.wrap(LongSerializer.INSTANCE, true),
+ NULL_OR_LONG);
+
+ testSpecifications.add(
+ "nullable-not-padded-serializer",
+ NullableSerializer.class,
+ NullableSerializerSnapshot.class,
+ () -> NullableSerializer.wrap(LongSerializer.INSTANCE, false),
+ NULL_OR_LONG);
+
+ return testSpecifications.get();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final Matcher<Long> NULL_OR_LONG = new NullableMatcher();
+
+ private static final class NullableMatcher extends BaseMatcher<Long> {
+
+ @Override
+ public boolean matches(Object item) {
+ return item == null || item instanceof Long;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a null or a long");
+ }
+ }
+
+}
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data
new file mode 100644
index 0000000..ca6d29f
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot
new file mode 100644
index 0000000..8c8b27f
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data
new file mode 100644
index 0000000..42439f7
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot
new file mode 100644
index 0000000..9bd3af7
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data
new file mode 100644
index 0000000..aecf5b8
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot
new file mode 100644
index 0000000..a2e0807
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data
new file mode 100644
index 0000000..426221e
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot
new file mode 100644
index 0000000..685f8dd
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot differ