You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:11 UTC

[flink] 06/18: [FLINK-11329] [core] Migrating the NullableSerializer to use new compatibility API

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b22e91358fd2e58fc064968953a73218e87cfc88
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 15:27:50 2019 +0100

    [FLINK-11329] [core] Migrating the NullableSerializer to use new compatibility API
---
 .../java/typeutils/runtime/NullableSerializer.java | 179 +++++++++++++++------
 .../TypeSerializerSnapshotMigrationTestBase.java   |  63 +++++++-
 .../runtime/NullableSerializerMigrationTest.java   |  83 ++++++++++
 .../flink-1.6-nullable-not-padded-serializer-data  | Bin 0 -> 58 bytes
 ...ink-1.6-nullable-not-padded-serializer-snapshot | Bin 0 -> 944 bytes
 .../flink-1.6-nullable-padded-serializer-data      | Bin 0 -> 90 bytes
 .../flink-1.6-nullable-padded-serializer-snapshot  | Bin 0 -> 952 bytes
 .../flink-1.7-nullable-not-padded-serializer-data  | Bin 0 -> 58 bytes
 ...ink-1.7-nullable-not-padded-serializer-snapshot | Bin 0 -> 941 bytes
 .../flink-1.7-nullable-padded-serializer-data      | Bin 0 -> 90 bytes
 .../flink-1.7-nullable-padded-serializer-snapshot  | Bin 0 -> 949 bytes
 11 files changed, 270 insertions(+), 55 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
index fe392e4..05941fa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
@@ -19,24 +19,23 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Serializer wrapper to add support of {@code null} value serialization.
@@ -65,12 +64,15 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 	private final byte[] padding;
 
 	private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
-		this.originalSerializer = originalSerializer;
-		this.padding = createPadding(originalSerializer.getLength(), padNullValueIfFixedLen);
+		this(originalSerializer, createPadding(originalSerializer.getLength(), padNullValueIfFixedLen));
+	}
 
+	private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, byte[] padding) {
+		this.originalSerializer = originalSerializer;
+		this.padding = padding;
 	}
 
-	private static <T> byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) {
+	private static byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) {
 		boolean padNullValue = originalSerializerLength > 0 && padNullValueIfFixedLen;
 		return padNullValue ? new byte[originalSerializerLength] : EMPTY_BYTE_ARRAY;
 	}
@@ -79,7 +81,7 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 	 * This method tries to serialize {@code null} value with the {@code originalSerializer}
 	 * and wraps it in case of {@link NullPointerException}, otherwise it returns the {@code originalSerializer}.
 	 *
-	 * @param originalSerializer serializer to wrap and add {@code null} support
+	 * @param originalSerializer     serializer to wrap and add {@code null} support
 	 * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
 	 * @return serializer which supports {@code null} values
 	 */
@@ -99,22 +101,24 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 		DataOutputSerializer dos = new DataOutputSerializer(length);
 		try {
 			serializer.serialize(null, dos);
-		} catch (IOException | RuntimeException e) {
+		}
+		catch (IOException | RuntimeException e) {
 			return false;
 		}
-		Preconditions.checkArgument(
+		checkArgument(
 			serializer.getLength() < 0 || serializer.getLength() == dos.getCopyOfBuffer().length,
 			"The serialized form of the null value should have the same length " +
 				"as any other if the length is fixed in the serializer");
 		DataInputDeserializer dis = new DataInputDeserializer(dos.getSharedBuffer());
 		try {
-			Preconditions.checkArgument(serializer.deserialize(dis) == null);
-		} catch (IOException e) {
+			checkArgument(serializer.deserialize(dis) == null);
+		}
+		catch (IOException e) {
 			throw new RuntimeException(
 				String.format("Unexpected failure to deserialize just serialized null value with %s",
 					serializer.getClass().getName()), e);
 		}
-		Preconditions.checkArgument(
+		checkArgument(
 			serializer.copy(null) == null,
 			"Serializer %s has to be able properly copy null value if it can serialize it",
 			serializer.getClass().getName());
@@ -125,10 +129,18 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 		return padding.length > 0;
 	}
 
+	private int nullPaddingLength() {
+		return padding.length;
+	}
+
+	private TypeSerializer<T> originalSerializer() {
+		return originalSerializer;
+	}
+
 	/**
 	 * This method wraps the {@code originalSerializer} with the {@code NullableSerializer} if not already wrapped.
 	 *
-	 * @param originalSerializer serializer to wrap and add {@code null} support
+	 * @param originalSerializer     serializer to wrap and add {@code null} support
 	 * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
 	 * @return wrapped serializer which supports {@code null} values
 	 */
@@ -176,7 +188,8 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 		if (record == null) {
 			target.writeBoolean(true);
 			target.write(padding);
-		} else {
+		}
+		else {
 			target.writeBoolean(false);
 			originalSerializer.serialize(record, target);
 		}
@@ -209,7 +222,8 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 		target.writeBoolean(isNull);
 		if (isNull) {
 			target.write(padding);
-		} else {
+		}
+		else {
 			originalSerializer.copy(source, target);
 		}
 	}
@@ -233,45 +247,29 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public NullableSerializerConfigSnapshot<T> snapshotConfiguration() {
-		return new NullableSerializerConfigSnapshot<>(originalSerializer);
+	public TypeSerializerSnapshot<T> snapshotConfiguration() {
+		return new NullableSerializerSnapshot<>(this);
 	}
 
-	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-		if (configSnapshot instanceof NullableSerializerConfigSnapshot) {
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
-				((NullableSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
-
-			CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
-				previousKvSerializersAndConfigs.get(0).f0,
-				UnloadableDummyTypeSerializer.class,
-				previousKvSerializersAndConfigs.get(0).f1,
-				originalSerializer);
-
-			if (!compatResult.isRequiresMigration()) {
-				return CompatibilityResult.compatible();
-			} else if (compatResult.getConvertDeserializer() != null) {
-				return CompatibilityResult.requiresMigration(
-					new NullableSerializer<>(
-						new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), padNullValue()));
-			}
-		}
-
-		return CompatibilityResult.requiresMigration();
-	}
 
 	/**
 	 * Configuration snapshot for serializers of nullable types, containing the
 	 * configuration snapshot of its original serializer.
+	 *
+	 * @deprecated this snapshot class is no longer in use, and is maintained only for
+	 *             backwards compatibility purposes. It is fully replaced
+	 *             by {@link NullableSerializerSnapshot}.
 	 */
+	@Deprecated
 	@Internal
-	public static class NullableSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+	public static class NullableSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<T> {
 		private static final int VERSION = 1;
 
-		/** This empty nullary constructor is required for deserializing the configuration. */
-		@SuppressWarnings("unused")
-		public NullableSerializerConfigSnapshot() {}
+		/**
+		 * This empty nullary constructor is required for deserializing the configuration.
+		 */
+		public NullableSerializerConfigSnapshot() {
+		}
 
 		NullableSerializerConfigSnapshot(TypeSerializer<T> originalSerializer) {
 			super(originalSerializer);
@@ -281,5 +279,88 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 		public int getVersion() {
 			return VERSION;
 		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
+			NullableSerializer<T> previousSerializer = (NullableSerializer<T>) restoreSerializer();
+			NullableSerializerSnapshot<T> newCompositeSnapshot = new NullableSerializerSnapshot<>(previousSerializer.nullPaddingLength());
+
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				newCompositeSnapshot,
+				getSingleNestedSerializerAndConfig().f1
+			);
+		}
 	}
+
+	/**
+	 * Snapshot for serializers of nullable types, containing the
+	 * snapshot of its original serializer.
+	 */
+	@SuppressWarnings({"unchecked", "WeakerAccess"})
+	public static class NullableSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<T, NullableSerializer<T>> {
+
+		private static final int VERSION = 2;
+		private int nullPaddingLength;
+
+		@SuppressWarnings("unused")
+		public NullableSerializerSnapshot() {
+			super(serializerClass());
+		}
+
+		public NullableSerializerSnapshot(NullableSerializer<T> serializerInstance) {
+			super(serializerInstance);
+			this.nullPaddingLength = serializerInstance.nullPaddingLength();
+		}
+
+		private NullableSerializerSnapshot(int nullPaddingLength) {
+			super(serializerClass());
+			checkArgument(nullPaddingLength >= 0,
+				"Computed NULL padding can not be negative. %d",
+				nullPaddingLength);
+
+			this.nullPaddingLength = nullPaddingLength;
+		}
+
+		@Override
+		protected int getCurrentOuterSnapshotVersion() {
+			return VERSION;
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(NullableSerializer<T> outerSerializer) {
+			return new TypeSerializer[]{outerSerializer.originalSerializer()};
+		}
+
+		@Override
+		protected NullableSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			checkState(nullPaddingLength >= 0,
+				"Negative padding size after serializer construction: %d",
+				nullPaddingLength);
+
+			final byte[] padding = (nullPaddingLength == 0) ? EMPTY_BYTE_ARRAY : new byte[nullPaddingLength];
+			TypeSerializer<T> nestedSerializer = (TypeSerializer<T>) nestedSerializers[0];
+			return new NullableSerializer<>(nestedSerializer, padding);
+		}
+
+		@Override
+		protected void writeOuterSnapshot(DataOutputView out) throws IOException {
+			out.writeInt(nullPaddingLength);
+		}
+
+		@Override
+		protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+			nullPaddingLength = in.readInt();
+		}
+
+		@Override
+		protected boolean isOuterSnapshotCompatible(NullableSerializer<T> newSerializer) {
+			return nullPaddingLength == newSerializer.nullPaddingLength();
+		}
+
+		private static <T> Class<NullableSerializer<T>> serializerClass() {
+			return (Class<NullableSerializer<T>>) (Class<?>) NullableSerializer.class;
+		}
+	}
+
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index 57c939d..2e363fb 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.TestLogger;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeMatcher;
-
 import org.junit.Test;
 
 import java.io.IOException;
@@ -89,9 +88,11 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		TypeSerializer<ElementT> serializer = snapshot.restoreSerializer();
 
 		DataInputView input = dataUnderTest();
+
+		final Matcher<ElementT> matcher = testSpecification.testDataElementMatcher;
 		for (int i = 0; i < testSpecification.testDataCount; i++) {
 			final ElementT result = serializer.deserialize(input);
-			assertThat(result, notNullValue());
+			assertThat(result, matcher);
 		}
 	}
 
@@ -193,6 +194,10 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 	// Test Specification
 	// --------------------------------------------------------------------------------------------------------------
 
+	/**
+	 * Test Specification.
+	 */
+	@SuppressWarnings("WeakerAccess")
 	protected static final class TestSpecification<T> {
 		private final Class<? extends TypeSerializer<T>> serializerType;
 		private final Class<? extends TypeSerializerSnapshot<T>> snapshotClass;
@@ -205,6 +210,9 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		private int testDataCount;
 
 		@SuppressWarnings("unchecked")
+		private Matcher<T> testDataElementMatcher = (Matcher<T>) notNullValue();
+
+		@SuppressWarnings("unchecked")
 		public static <T> TestSpecification<T> builder(
 			String name,
 			Class<? extends TypeSerializer> serializerClass,
@@ -253,6 +261,11 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 			return this;
 		}
 
+		public TestSpecification<T> withTestDataMatcher(Matcher<T> matcher) {
+			testDataElementMatcher = matcher;
+			return this;
+		}
+
 		private TypeSerializer<T> createSerializer() {
 			try {
 				return (serializerProvider == null) ? serializerType.newInstance() : serializerProvider.get();
@@ -274,10 +287,6 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 			return testMigrationVersion;
 		}
 
-		public Class<? extends TypeSerializerSnapshot<T>> getSnapshotClass() {
-			return snapshotClass;
-		}
-
 		@Override
 		public String toString() {
 			return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName());
@@ -344,6 +353,45 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		/**
 		 * Adds a test specification to be tested for all specified test versions.
 		 *
+		 * <p>This method adds the specification with pre-defined snapshot and data filenames,
+		 * with the format "flink-&lt;testVersion&gt;-&lt;specName&gt;-&lt;data/snapshot&gt;",
+		 * and each specification's test data count is assumed to always be 10.
+		 *
+		 * @param name test specification name.
+		 * @param serializerClass class of the current serializer.
+		 * @param snapshotClass class of the current serializer snapshot class.
+		 * @param serializerProvider provider for an instance of the current serializer.
+		 * @param elementMatcher an {@code hamcrest} matcher to match test data.
+		 *
+		 * @param <T> type of the test data.
+		 */
+		public <T> void add(
+			String name,
+			Class<? extends TypeSerializer> serializerClass,
+			Class<? extends TypeSerializerSnapshot> snapshotClass,
+			Supplier<? extends TypeSerializer<T>> serializerProvider,
+			Matcher<T> elementMatcher)  {
+			for (MigrationVersion testVersion : testVersions) {
+				testSpecifications.add(
+					TestSpecification.<T>builder(
+						getSpecNameForVersion(name, testVersion),
+						serializerClass,
+						snapshotClass,
+						testVersion)
+						.withNewSerializerProvider(serializerProvider)
+						.withSnapshotDataLocation(
+							String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name))
+						.withTestData(
+							String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name),
+							DEFAULT_TEST_DATA_COUNT)
+					.withTestDataMatcher(elementMatcher)
+				);
+			}
+		}
+
+		/**
+		 * Adds a test specification to be tested for all specified test versions.
+		 *
 		 * @param name test specification name.
 		 * @param serializerClass class of the current serializer.
 		 * @param snapshotClass class of the current serializer snapshot class.
@@ -385,6 +433,9 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		}
 	}
 
+	/**
+	 * Supplier of paths based on {@link MigrationVersion}.
+	 */
 	protected interface TestResourceFilenameSupplier {
 		String get(MigrationVersion testVersion);
 	}
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java
new file mode 100644
index 0000000..1da7da7
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer.NullableSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * {@link NullableSerializer} migration test.
+ */
+@RunWith(Parameterized.class)
+public class NullableSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Long> {
+
+	public NullableSerializerMigrationTest(TestSpecification<Long> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"nullable-padded-serializer",
+			NullableSerializer.class,
+			NullableSerializerSnapshot.class,
+			() -> NullableSerializer.wrap(LongSerializer.INSTANCE, true),
+			NULL_OR_LONG);
+
+		testSpecifications.add(
+			"nullable-not-padded-serializer",
+			NullableSerializer.class,
+			NullableSerializerSnapshot.class,
+			() -> NullableSerializer.wrap(LongSerializer.INSTANCE, false),
+			NULL_OR_LONG);
+
+		return testSpecifications.get();
+	}
+
+	@SuppressWarnings("unchecked")
+	private static final Matcher<Long> NULL_OR_LONG = new NullableMatcher();
+
+	private static final class NullableMatcher extends BaseMatcher<Long> {
+
+		@Override
+		public boolean matches(Object item) {
+			return item == null || item instanceof Long;
+		}
+
+		@Override
+		public void describeTo(Description description) {
+			description.appendText("a null or a long");
+		}
+	}
+
+}
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data
new file mode 100644
index 0000000..ca6d29f
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot
new file mode 100644
index 0000000..8c8b27f
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data
new file mode 100644
index 0000000..42439f7
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot
new file mode 100644
index 0000000..9bd3af7
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data
new file mode 100644
index 0000000..aecf5b8
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot
new file mode 100644
index 0000000..a2e0807
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data
new file mode 100644
index 0000000..426221e
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot
new file mode 100644
index 0000000..685f8dd
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot differ