You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/23 09:22:46 UTC

[flink] 01/05: [FLINK-11328] [core] Upgrade parameterless / singleton serializers to use new serialization compatibility APIs

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit edf6d59d315fc0f88d35d6686b39891864537620
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Jan 14 16:46:48 2019 +0100

    [FLINK-11328] [core] Upgrade parameterless / singleton serializers to use new serialization compatibility APIs
---
 .../connectors/kafka/FlinkKafkaProducer011.java    | 36 +++++++++
 .../connectors/kafka/FlinkKafkaProducer.java       | 38 ++++++++++
 .../ParameterlessTypeSerializerConfig.java         |  4 +
 .../typeutils/SimpleTypeSerializerSnapshot.java    | 88 +++++-----------------
 .../common/typeutils/base/BigDecSerializer.java    |  3 +-
 .../common/typeutils/base/BigIntSerializer.java    |  3 +-
 .../common/typeutils/base/BooleanSerializer.java   |  3 +-
 .../typeutils/base/BooleanValueSerializer.java     |  3 +-
 .../api/common/typeutils/base/ByteSerializer.java  |  3 +-
 .../common/typeutils/base/ByteValueSerializer.java |  3 +-
 .../api/common/typeutils/base/CharSerializer.java  |  3 +-
 .../common/typeutils/base/CharValueSerializer.java |  3 +-
 .../api/common/typeutils/base/DateSerializer.java  |  3 +-
 .../common/typeutils/base/DoubleSerializer.java    |  3 +-
 .../typeutils/base/DoubleValueSerializer.java      |  3 +-
 .../api/common/typeutils/base/FloatSerializer.java |  3 +-
 .../typeutils/base/FloatValueSerializer.java       |  3 +-
 .../common/typeutils/base/InstantSerializer.java   |  3 +-
 .../api/common/typeutils/base/IntSerializer.java   |  3 +-
 .../common/typeutils/base/IntValueSerializer.java  |  3 +-
 .../api/common/typeutils/base/LongSerializer.java  |  3 +-
 .../common/typeutils/base/LongValueSerializer.java |  3 +-
 .../common/typeutils/base/NullValueSerializer.java |  3 +-
 .../api/common/typeutils/base/ShortSerializer.java |  3 +-
 .../typeutils/base/ShortValueSerializer.java       |  3 +-
 .../common/typeutils/base/SqlDateSerializer.java   |  3 +-
 .../common/typeutils/base/SqlTimeSerializer.java   |  3 +-
 .../typeutils/base/SqlTimestampSerializer.java     |  3 +-
 .../common/typeutils/base/StringSerializer.java    |  3 +-
 .../typeutils/base/StringValueSerializer.java      |  3 +-
 .../typeutils/base/TypeSerializerSingleton.java    | 12 ++-
 .../api/common/typeutils/base/VoidSerializer.java  |  3 +-
 .../array/BooleanPrimitiveArraySerializer.java     |  3 +-
 .../base/array/BytePrimitiveArraySerializer.java   |  3 +-
 .../base/array/CharPrimitiveArraySerializer.java   |  3 +-
 .../base/array/DoublePrimitiveArraySerializer.java |  3 +-
 .../base/array/FloatPrimitiveArraySerializer.java  |  3 +-
 .../base/array/IntPrimitiveArraySerializer.java    |  3 +-
 .../base/array/LongPrimitiveArraySerializer.java   |  3 +-
 .../base/array/ShortPrimitiveArraySerializer.java  |  3 +-
 .../base/array/StringArraySerializer.java          |  3 +-
 .../java/org/apache/flink/cep/nfa/DeweyNumber.java | 20 +++++
 .../apache/flink/cep/nfa/NFAStateSerializer.java   | 19 +++++
 .../apache/flink/cep/nfa/sharedbuffer/EventId.java | 20 +++++
 .../apache/flink/cep/nfa/sharedbuffer/NodeId.java  | 19 +++++
 .../cep/nfa/sharedbuffer/SharedBufferEdge.java     | 20 +++++
 .../cep/nfa/sharedbuffer/SharedBufferNode.java     | 20 +++++
 .../transform/LongValueWithProperHashCode.java     | 21 +++++-
 .../types/valuearray/ByteValueArraySerializer.java | 20 ++++-
 .../types/valuearray/CharValueArraySerializer.java | 21 +++++-
 .../valuearray/DoubleValueArraySerializer.java     | 21 +++++-
 .../valuearray/FloatValueArraySerializer.java      | 21 +++++-
 .../types/valuearray/IntValueArraySerializer.java  | 21 +++++-
 .../types/valuearray/LongValueArraySerializer.java | 21 +++++-
 .../types/valuearray/NullValueArraySerializer.java | 20 +++++
 .../valuearray/ShortValueArraySerializer.java      | 21 +++++-
 .../valuearray/StringValueArraySerializer.java     | 21 +++++-
 .../client/VoidNamespaceSerializer.java            | 20 +++++
 .../apache/flink/runtime/state/JavaSerializer.java | 20 +++++
 .../runtime/state/VoidNamespaceSerializer.java     | 20 +++++
 .../flink/api/scala/typeutils/UnitSerializer.scala | 27 +++++++
 .../api/windowing/windows/GlobalWindow.java        | 20 +++++
 .../api/windowing/windows/TimeWindow.java          | 20 +++++
 .../jar/CheckpointingCustomKvStateProgram.java     | 20 +++++
 64 files changed, 610 insertions(+), 149 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 3e7cf2b..129ed66 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
@@ -1230,6 +1232,23 @@ public class FlinkKafkaProducer011<IN>
 		public boolean canEqual(Object obj) {
 			return obj instanceof TransactionStateSerializer;
 		}
+
+		// ------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<KafkaTransactionState> snapshotConfiguration() {
+			return new TransactionStateSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class TransactionStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionState> {
+			public TransactionStateSerializerSnapshot() {
+				super(TransactionStateSerializer::new);
+			}
+		}
 	}
 
 	/**
@@ -1312,6 +1331,23 @@ public class FlinkKafkaProducer011<IN>
 		public boolean canEqual(Object obj) {
 			return obj instanceof ContextStateSerializer;
 		}
+
+		// ------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
+			return new ContextStateSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class ContextStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
+			public ContextStateSerializerSnapshot() {
+				super(ContextStateSerializer::new);
+			}
+		}
 	}
 
 	/**
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 10e8ef1..15b5b9f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
@@ -1236,6 +1238,24 @@ public class FlinkKafkaProducer<IN>
 		public boolean canEqual(Object obj) {
 			return obj instanceof FlinkKafkaProducer.TransactionStateSerializer;
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState> snapshotConfiguration() {
+			return new TransactionStateSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class TransactionStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState> {
+
+			public TransactionStateSerializerSnapshot() {
+				super(TransactionStateSerializer::new);
+			}
+		}
 	}
 
 	/**
@@ -1318,6 +1338,24 @@ public class FlinkKafkaProducer<IN>
 		public boolean canEqual(Object obj) {
 			return obj instanceof FlinkKafkaProducer.ContextStateSerializer;
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
+			return new ContextStateSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class ContextStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
+
+			public ContextStateSerializerSnapshot() {
+				super(ContextStateSerializer::new);
+			}
+		}
 	}
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
index 6fc6d17..29da90a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
@@ -27,8 +27,12 @@ import java.io.IOException;
 
 /**
  * A base class for {@link TypeSerializerConfigSnapshot}s that do not have any parameters.
+ *
+ * @deprecated this snapshot class is no longer used by any serializers, and is maintained only
+ *             for backward compatibility reasons. It is fully replaced by {@link SimpleTypeSerializerSnapshot}.
  */
 @Internal
+@Deprecated
 public final class ParameterlessTypeSerializerConfig<T> extends TypeSerializerConfigSnapshot<T> {
 
 	private static final int VERSION = 1;
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java
index 03fc1cb..dca34e9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java
@@ -21,15 +21,13 @@ package org.apache.flink.api.common.typeutils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A simple base class for TypeSerializerSnapshots, for serializers that have no
@@ -46,24 +44,18 @@ public abstract class SimpleTypeSerializerSnapshot<T> implements TypeSerializerS
 	 * backwards compatible code paths in case we decide to make this snapshot backwards compatible with
 	 * the {@link ParameterlessTypeSerializerConfig}.
 	 */
-	private static final int CURRENT_VERSION = 2;
+	private static final int CURRENT_VERSION = 3;
 
 	/** The class of the serializer for this snapshot.
 	 * The field is null if the serializer was created for read and has not been read, yet. */
-	@Nullable
-	private Class<? extends TypeSerializer<T>> serializerClass;
-
-	/**
-	 * Default constructor for instantiation on restore (reading the snapshot).
-	 */
-	@SuppressWarnings("unused")
-	public SimpleTypeSerializerSnapshot() {}
+	@Nonnull
+	private Supplier<? extends TypeSerializer<T>> serializerSupplier;
 
 	/**
 	 * Constructor to create snapshot from serializer (writing the snapshot).
 	 */
-	public SimpleTypeSerializerSnapshot(@Nonnull Class<? extends TypeSerializer<T>> serializerClass) {
-		this.serializerClass = checkNotNull(serializerClass);
+	public SimpleTypeSerializerSnapshot(@Nonnull Supplier<? extends TypeSerializer<T>> serializerSupplier) {
+		this.serializerSupplier = checkNotNull(serializerSupplier);
 	}
 
 	// ------------------------------------------------------------------------
@@ -77,42 +69,39 @@ public abstract class SimpleTypeSerializerSnapshot<T> implements TypeSerializerS
 
 	@Override
 	public TypeSerializer<T> restoreSerializer() {
-		checkState(serializerClass != null);
-		return InstantiationUtil.instantiate(serializerClass);
+		return serializerSupplier.get();
 	}
 
 	@Override
 	public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
 
-		checkState(serializerClass != null);
-		return newSerializer.getClass() == serializerClass ?
+		return newSerializer.getClass() == serializerSupplier.get().getClass() ?
 				TypeSerializerSchemaCompatibility.compatibleAsIs() :
 				TypeSerializerSchemaCompatibility.incompatible();
 	}
 
 	@Override
 	public void writeSnapshot(DataOutputView out) throws IOException {
-		checkState(serializerClass != null);
-		out.writeUTF(serializerClass.getName());
+		//
 	}
 
 	@Override
 	public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) throws IOException {
 		switch (readVersion) {
-			case 2:
-				read(in, classLoader);
+			case 3: {
 				break;
-			default:
+			}
+			case 2: {
+				// we don't need the classname any more; read and drop to maintain compatibility
+				in.readUTF();
+				break;
+			}
+			default: {
 				throw new IOException("Unrecognized version: " + readVersion);
+			}
 		}
 	}
 
-	private void read(DataInputView in, ClassLoader classLoader) throws IOException {
-		final String className = in.readUTF();
-		final Class<?> clazz = resolveClassName(className, classLoader, false);
-		this.serializerClass = cast(clazz);
-	}
-
 	// ------------------------------------------------------------------------
 	//  standard utilities
 	// ------------------------------------------------------------------------
@@ -131,45 +120,4 @@ public abstract class SimpleTypeSerializerSnapshot<T> implements TypeSerializerS
 	public String toString() {
 		return getClass().getName();
 	}
-
-	// ------------------------------------------------------------------------
-	//  utilities
-	// ------------------------------------------------------------------------
-
-	private static Class<?> resolveClassName(String className, ClassLoader cl, boolean allowCanonicalName) throws IOException {
-		try {
-			return Class.forName(className, false, cl);
-		}
-		catch (ClassNotFoundException e) {
-			if (allowCanonicalName) {
-				try {
-					return Class.forName(guessClassNameFromCanonical(className), false, cl);
-				}
-				catch (ClassNotFoundException ignored) {}
-			}
-
-			// throw with original ClassNotFoundException
-			throw new IOException(
-						"Failed to read SimpleTypeSerializerSnapshot: Serializer class not found: " + className, e);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private static <T> Class<? extends TypeSerializer<T>> cast(Class<?> clazz) throws IOException {
-		if (!TypeSerializer.class.isAssignableFrom(clazz)) {
-			throw new IOException("Failed to read SimpleTypeSerializerSnapshot. " +
-					"Serializer class name leads to a class that is not a TypeSerializer: " + clazz.getName());
-		}
-
-		return (Class<? extends TypeSerializer<T>>) clazz;
-	}
-
-	static String guessClassNameFromCanonical(String className) {
-		int lastDot = className.lastIndexOf('.');
-		if (lastDot > 0 && lastDot < className.length() - 1) {
-			return className.substring(0, lastDot) + '$' + className.substring(lastDot + 1);
-		} else {
-			return className;
-		}
-	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
index ed5a3a0..7e4e97f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
@@ -151,10 +151,11 @@ public final class BigDecSerializer extends TypeSerializerSingleton<BigDecimal>
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BigDecSerializerSnapshot extends SimpleTypeSerializerSnapshot<BigDecimal> {
 
 		public BigDecSerializerSnapshot() {
-			super(BigDecSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
index b4c3a19..ef4358e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
@@ -156,10 +156,11 @@ public final class BigIntSerializer extends TypeSerializerSingleton<BigInteger>
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BigIntSerializerSnapshot extends SimpleTypeSerializerSnapshot<BigInteger> {
 
 		public BigIntSerializerSnapshot() {
-			super(BigIntSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index 21c558b..84056e5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -99,10 +99,11 @@ public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BooleanSerializerSnapshot extends SimpleTypeSerializerSnapshot<Boolean> {
 
 		public BooleanSerializerSnapshot() {
-			super(BooleanSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index e66fe06..3e497ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -98,10 +98,11 @@ public final class BooleanValueSerializer extends TypeSerializerSingleton<Boolea
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BooleanValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<BooleanValue> {
 
 		public BooleanValueSerializerSnapshot() {
-			super(BooleanValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index ab9061f..5fdd9b6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -99,10 +99,11 @@ public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class ByteSerializerSnapshot extends SimpleTypeSerializerSnapshot<Byte> {
 
 		public ByteSerializerSnapshot() {
-			super(ByteSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index e7d92bb..6f0cb76 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -96,10 +96,11 @@ public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class ByteValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<ByteValue> {
 
 		public ByteValueSerializerSnapshot() {
-			super(ByteValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index 7805d54..cee0d0d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -99,10 +99,11 @@ public final class CharSerializer extends TypeSerializerSingleton<Character> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class CharSerializerSnapshot extends SimpleTypeSerializerSnapshot<Character> {
 
 		public CharSerializerSnapshot() {
-			super(CharSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 697513a..aa580fe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -96,10 +96,11 @@ public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class CharValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<CharValue> {
 
 		public CharValueSerializerSnapshot() {
-			super(CharValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
index 21c8432..0675e22 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -114,10 +114,11 @@ public final class DateSerializer extends TypeSerializerSingleton<Date> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class DateSerializerSnapshot extends SimpleTypeSerializerSnapshot<Date> {
 
 		public DateSerializerSnapshot() {
-			super(DateSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index 447c548..cd4854b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -99,10 +99,11 @@ public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class DoubleSerializerSnapshot extends SimpleTypeSerializerSnapshot<Double> {
 
 		public DoubleSerializerSnapshot() {
-			super(DoubleSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index 27b9fe2..c0c6a27 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -96,10 +96,11 @@ public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleV
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class DoubleValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<DoubleValue> {
 
 		public DoubleValueSerializerSnapshot() {
-			super(DoubleValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
index b88fbaf..6da6f47 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
@@ -99,10 +99,11 @@ public final class FloatSerializer extends TypeSerializerSingleton<Float> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class FloatSerializerSnapshot extends SimpleTypeSerializerSnapshot<Float> {
 
 		public FloatSerializerSnapshot() {
-			super(FloatSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
index f5292ce..1bfc87d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
@@ -96,10 +96,11 @@ public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class FloatValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<FloatValue> {
 
 		public FloatValueSerializerSnapshot() {
-			super(FloatValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
index d39aba2..17f1b7a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
@@ -115,10 +115,11 @@ public final class InstantSerializer extends TypeSerializerSingleton<Instant> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class InstantSerializerSnapshot extends SimpleTypeSerializerSnapshot<Instant> {
 
 		public InstantSerializerSnapshot() {
-			super(InstantSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
index 0c87786..6d0d407 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
@@ -101,8 +101,9 @@ public final class IntSerializer extends TypeSerializerSingleton<Integer> {
 	 */
 	public static final class IntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
 
+		@SuppressWarnings("WeakerAccess")
 		public IntSerializerSnapshot() {
-			super(IntSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
index fae1bb6..846df31 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
@@ -96,10 +96,11 @@ public final class IntValueSerializer extends TypeSerializerSingleton<IntValue>
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class IntValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<IntValue> {
 
 		public IntValueSerializerSnapshot() {
-			super(IntValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
index 8100307..24cf7c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
@@ -99,10 +99,11 @@ public final class LongSerializer extends TypeSerializerSingleton<Long> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class LongSerializerSnapshot extends SimpleTypeSerializerSnapshot<Long> {
 
 		public LongSerializerSnapshot() {
-			super(LongSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
index 2d1fea8..9f4b5d8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
@@ -96,10 +96,11 @@ public final class LongValueSerializer extends TypeSerializerSingleton<LongValue
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class LongValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<LongValue> {
 
 		public LongValueSerializerSnapshot() {
-			super(LongValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
index 233cc56..48c0b57 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
@@ -92,10 +92,11 @@ public final class NullValueSerializer extends TypeSerializerSingleton<NullValue
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class NullValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<NullValue> {
 
 		public NullValueSerializerSnapshot() {
-			super(NullValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
index d1c78ec..fa6189d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
@@ -99,10 +99,11 @@ public final class ShortSerializer extends TypeSerializerSingleton<Short> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class ShortSerializerSnapshot extends SimpleTypeSerializerSnapshot<Short> {
 
 		public ShortSerializerSnapshot() {
-			super(ShortSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
index b8f815b..e667faa4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
@@ -96,10 +96,11 @@ public final class ShortValueSerializer extends TypeSerializerSingleton<ShortVal
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class ShortValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<ShortValue> {
 
 		public ShortValueSerializerSnapshot() {
-			super(ShortValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
index 6b780a3..ab96dd9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
@@ -115,10 +115,11 @@ public final class SqlDateSerializer extends TypeSerializerSingleton<Date> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class SqlDateSerializerSnapshot extends SimpleTypeSerializerSnapshot<Date> {
 
 		public SqlDateSerializerSnapshot() {
-			super(SqlDateSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
index 9218ca1..b5bf7b5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
@@ -119,10 +119,11 @@ public final class SqlTimeSerializer extends TypeSerializerSingleton<Time> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class SqlTimeSerializerSnapshot extends SimpleTypeSerializerSnapshot<Time> {
 
 		public SqlTimeSerializerSnapshot() {
-			super(SqlTimeSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
index e35c78b..5d7760c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
@@ -123,10 +123,11 @@ public final class SqlTimestampSerializer extends TypeSerializerSingleton<Timest
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class SqlTimestampSerializerSnapshot extends SimpleTypeSerializerSnapshot<Timestamp> {
 
 		public SqlTimestampSerializerSnapshot() {
-			super(SqlTimestampSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
index 0761779..dc67a9c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
@@ -100,10 +100,11 @@ public final class StringSerializer extends TypeSerializerSingleton<String> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class StringSerializerSnapshot extends SimpleTypeSerializerSnapshot<String> {
 
 		public StringSerializerSnapshot() {
-			super(StringSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
index 9075cf4..498b0f0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
@@ -121,10 +121,11 @@ public final class StringValueSerializer extends TypeSerializerSingleton<StringV
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class StringValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<StringValue> {
 
 		public StringValueSerializerSnapshot() {
-			super(StringValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index 4f01d76..437aa14 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 
 @Internal
 public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
@@ -53,13 +52,12 @@ public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
 		}
 	}
 
+	/**
+	 * @deprecated this is kept around for backwards compatibility.
+	 *             Can only be removed when {@link ParameterlessTypeSerializerConfig} is removed.
+	 */
 	@Override
-	public TypeSerializerSnapshot<T> snapshotConfiguration() {
-		// type serializer singletons should always be parameter-less
-		return new ParameterlessTypeSerializerConfig<>(getSerializationFormatIdentifier());
-	}
-
-	@Override
+	@Deprecated
 	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof ParameterlessTypeSerializerConfig
 				&& isCompatibleSerializationFormatIdentifier(
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
index ec87e2b..d9fae48 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
@@ -100,10 +100,11 @@ public final class VoidSerializer extends TypeSerializerSingleton<Void> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class VoidSerializerSnapshot extends SimpleTypeSerializerSnapshot<Void> {
 
 		public VoidSerializerSnapshot() {
-			super(VoidSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
index ecfd604..a8f76c4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
@@ -120,10 +120,11 @@ public final class BooleanPrimitiveArraySerializer extends TypeSerializerSinglet
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BooleanPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<boolean[]> {
 
 		public BooleanPrimitiveArraySerializerSnapshot() {
-			super(BooleanPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
index 414ef17..803bd53 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
@@ -113,10 +113,11 @@ public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton<
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BytePrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<byte[]> {
 
 		public BytePrimitiveArraySerializerSnapshot() {
-			super(BytePrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
index f08062b..1e2357a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton<
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class CharPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<char[]> {
 
 		public CharPrimitiveArraySerializerSnapshot() {
-			super(CharPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
index a38ae66..5deddc8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleto
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class DoublePrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<double[]> {
 
 		public DoublePrimitiveArraySerializerSnapshot() {
-			super(DoublePrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
index d447793..f65bcf3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class FloatPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<float[]> {
 
 		public FloatPrimitiveArraySerializerSnapshot() {
-			super(FloatPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
index cbfa04d..7923eca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class IntPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<int[]> {
 
 		public IntPrimitiveArraySerializerSnapshot() {
-			super(IntPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
index f56978c..66afb32 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton<
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class LongPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<long[]> {
 
 		public LongPrimitiveArraySerializerSnapshot() {
-			super(LongPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
index b9dd1a9..a93dd71 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class ShortPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<short[]> {
 
 		public ShortPrimitiveArraySerializerSnapshot() {
-			super(ShortPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
index 1cdf317..7c81aa6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
@@ -123,9 +123,10 @@ public final class StringArraySerializer extends TypeSerializerSingleton<String[
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class StringArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<String[]> {
 		public StringArraySerializerSnapshot() {
-			super(StringArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index 68e0eec..53fffce 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
@@ -272,5 +274,23 @@ public class DeweyNumber implements Serializable {
 		public int hashCode() {
 			return elemSerializer.hashCode();
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<DeweyNumber> snapshotConfiguration() {
+			return new DeweyNumberSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class DeweyNumberSerializerSnapshot extends SimpleTypeSerializerSnapshot<DeweyNumber> {
+
+			public DeweyNumberSerializerSnapshot() {
+				super(() -> INSTANCE);
+			}
+		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
index 05b6c91..ccbe25c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -187,4 +189,21 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 		return true;
 	}
 
+	// -----------------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerSnapshot<NFAState> snapshotConfiguration() {
+		return new NFAStateSerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class NFAStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<NFAState> {
+
+		public NFAStateSerializerSnapshot() {
+			super(() -> INSTANCE);
+		}
+	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
index c1a6ccb..045cf38 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -147,5 +149,23 @@ public class EventId implements Comparable<EventId> {
 		public boolean canEqual(Object obj) {
 			return obj.getClass().equals(EventIdSerializer.class);
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<EventId> snapshotConfiguration() {
+			return new EventIdSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class EventIdSerializerSnapshot extends SimpleTypeSerializerSnapshot<EventId> {
+
+			public EventIdSerializerSnapshot() {
+				super(() -> INSTANCE);
+			}
+		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
index 3a13184..87dc2c3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -151,5 +153,22 @@ public class NodeId {
 			return obj.getClass().equals(NodeIdSerializer.class);
 		}
 
+		// ------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<NodeId> snapshotConfiguration() {
+			return new NodeIdSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class NodeIdSerializerSnapshot extends SimpleTypeSerializerSnapshot<NodeId> {
+
+			public NodeIdSerializerSnapshot() {
+				super(() -> INSTANCE);
+			}
+		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
index c8d9021..2af92f5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.cep.nfa.DeweyNumber;
 import org.apache.flink.core.memory.DataInputView;
@@ -122,5 +124,23 @@ public class SharedBufferEdge {
 		public boolean canEqual(Object obj) {
 			return obj.getClass().equals(SharedBufferEdgeSerializer.class);
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<SharedBufferEdge> snapshotConfiguration() {
+			return new SharedBufferEdgeSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class SharedBufferEdgeSerializerSnapshot extends SimpleTypeSerializerSnapshot<SharedBufferEdge> {
+
+			public SharedBufferEdgeSerializerSnapshot() {
+				super(() -> INSTANCE);
+			}
+		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
index b613625..96afdbb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge.SharedBufferEdgeSerializer;
@@ -116,5 +118,23 @@ public class SharedBufferNode {
 		public boolean canEqual(Object obj) {
 			return obj.getClass().equals(SharedBufferNodeSerializer.class);
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<SharedBufferNode> snapshotConfiguration() {
+			return new SharedBufferNodeSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class SharedBufferNodeSerializerSnapshot extends SimpleTypeSerializerSnapshot<SharedBufferNode> {
+
+			public SharedBufferNodeSerializerSnapshot() {
+				super(SharedBufferNodeSerializer::new);
+			}
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java
index 22a681c..6fc7bd0 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java
@@ -24,9 +24,10 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongValueComparator;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -220,10 +221,22 @@ extends LongValue {
 			return obj instanceof LongValueWithProperHashCodeSerializer;
 		}
 
+		// -----------------------------------------------------------------------------------
+
 		@Override
-		protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-			return super.isCompatibleSerializationFormatIdentifier(identifier)
-				|| identifier.equals(LongSerializer.class.getCanonicalName());
+		public TypeSerializerSnapshot<LongValueWithProperHashCode> snapshotConfiguration() {
+			return new LongValueWithProperHashCodeSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class LongValueWithProperHashCodeSerializerSnapshot extends SimpleTypeSerializerSnapshot<LongValueWithProperHashCode> {
+
+			public LongValueWithProperHashCodeSerializerSnapshot() {
+				super(LongValueWithProperHashCodeSerializer::new);
+			}
 		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java
index a49359a..c690543 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -85,9 +86,20 @@ public final class ByteValueArraySerializer extends TypeSerializerSingleton<Byte
 		return obj instanceof ByteValueArraySerializer;
 	}
 
+	// ------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<ByteValueArray> snapshotConfiguration() {
+		return new ByteValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class ByteValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<ByteValueArray> {
+		public ByteValueArraySerializerSnapshot() {
+			super(ByteValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java
index 4d97cdb..0844986 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -85,9 +86,21 @@ public final class CharValueArraySerializer extends TypeSerializerSingleton<Char
 		return obj instanceof CharValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<CharValueArray> snapshotConfiguration() {
+		return new CharValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class CharValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<CharValueArray> {
+
+		public CharValueArraySerializerSnapshot() {
+			super(CharValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java
index 5808946..be683b5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -85,9 +86,21 @@ public final class DoubleValueArraySerializer extends TypeSerializerSingleton<Do
 		return obj instanceof DoubleValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<DoubleValueArray> snapshotConfiguration() {
+		return new DoubleValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class DoubleValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<DoubleValueArray> {
+
+		public DoubleValueArraySerializerSnapshot() {
+			super(DoubleValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java
index fc78fd2..0fb0f61 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -85,9 +86,21 @@ public final class FloatValueArraySerializer extends TypeSerializerSingleton<Flo
 		return obj instanceof FloatValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<FloatValueArray> snapshotConfiguration() {
+		return new FloatValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class FloatValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<FloatValueArray> {
+
+		public FloatValueArraySerializerSnapshot() {
+			super(FloatValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
index 5984122..c00c136 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -84,9 +85,21 @@ public final class IntValueArraySerializer extends TypeSerializerSingleton<IntVa
 		return obj instanceof IntValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(IntPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<IntValueArray> snapshotConfiguration() {
+		return new IntValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class IntValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<IntValueArray> {
+
+		public IntValueArraySerializerSnapshot() {
+			super(IntValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
index ade46ca..55a79b3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -84,9 +85,21 @@ public final class LongValueArraySerializer extends TypeSerializerSingleton<Long
 		return obj instanceof LongValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<LongValueArray> snapshotConfiguration() {
+		return new LongValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class LongValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<LongValueArray> {
+
+		public LongValueArraySerializerSnapshot() {
+			super(LongValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
index 233ed20..732bb0f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -82,4 +84,22 @@ public final class NullValueArraySerializer extends TypeSerializerSingleton<Null
 	public boolean canEqual(Object obj) {
 		return obj instanceof NullValueArraySerializer;
 	}
+
+	// -----------------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerSnapshot<NullValueArray> snapshotConfiguration() {
+		return new NullValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class NullValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<NullValueArray> {
+
+		public NullValueArraySerializerSnapshot() {
+			super(NullValueArraySerializer::new);
+		}
+	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
index bce4d81..2000799 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -85,9 +86,21 @@ public final class ShortValueArraySerializer extends TypeSerializerSingleton<Sho
 		return obj instanceof ShortValueArraySerializer;
 	}
 
+	// ------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<ShortValueArray> snapshotConfiguration() {
+		return new ShortValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class ShortValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<ShortValueArray> {
+
+		public ShortValueArraySerializerSnapshot() {
+			super(ShortValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
index 6dbe0e5..d3dbe70 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -84,9 +85,21 @@ public final class StringValueArraySerializer extends TypeSerializerSingleton<St
 		return obj instanceof StringValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(StringArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<StringValueArray> snapshotConfiguration() {
+		return new StringValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class StringValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<StringValueArray> {
+
+		public StringValueArraySerializerSnapshot() {
+			super(StringValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
index 38db705..e6a9f10 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
@@ -19,6 +19,8 @@
 package org.apache.flink.queryablestate.client;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -93,4 +95,22 @@ public final class VoidNamespaceSerializer extends TypeSerializerSingleton<VoidN
 	public boolean canEqual(Object obj) {
 		return obj instanceof VoidNamespaceSerializer;
 	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerSnapshot<VoidNamespace> snapshotConfiguration() {
+		return new VoidNamespaceSerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public class VoidNamespaceSerializerSnapshot extends SimpleTypeSerializerSnapshot<VoidNamespace> {
+
+		public VoidNamespaceSerializerSnapshot() {
+			super(() -> INSTANCE);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index 7d9e888..60b5fd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
@@ -97,4 +99,22 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet
 	public boolean canEqual(Object obj) {
 		return obj instanceof JavaSerializer;
 	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerSnapshot<T> snapshotConfiguration() {
+		return new JavaSerializerSnapshot<>();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class JavaSerializerSnapshot<T extends Serializable> extends SimpleTypeSerializerSnapshot<T> {
+
+		public JavaSerializerSnapshot() {
+			super(JavaSerializer::new);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
index 8b58891..c861acf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -89,4 +91,22 @@ public final class VoidNamespaceSerializer extends TypeSerializerSingleton<VoidN
 	public boolean canEqual(Object obj) {
 		return obj instanceof VoidNamespaceSerializer;
 	}
+
+	// -----------------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerSnapshot<VoidNamespace> snapshotConfiguration() {
+		return new VoidNamespaceSerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class VoidNamespaceSerializerSnapshot extends SimpleTypeSerializerSnapshot<VoidNamespace> {
+
+		public VoidNamespaceSerializerSnapshot() {
+			super(() -> INSTANCE);
+		}
+	}
 }
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
index 32c44d2..8a49b18 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
@@ -17,8 +17,12 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import java.util.function.Supplier
+
 import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot}
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.api.scala.typeutils.UnitSerializer.UnitSerializerSnapshot
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 @Internal
@@ -58,4 +62,27 @@ class UnitSerializer extends TypeSerializerSingleton[Unit] {
   override def canEqual(obj: scala.Any): Boolean = {
     obj.isInstanceOf[UnitSerializer]
   }
+
+  // -----------------------------------------------------------------------------------
+
+  @Override
+  def snapshotConfiguration(): TypeSerializerSnapshot[Unit] = {
+    new UnitSerializerSnapshot()
+  }
+
+  /**
+    * Serializer configuration snapshot for compatibility and format evolution.
+    */
+}
+
+object UnitSerializer {
+
+  /**
+    * Serializer configuration snapshot for compatibility and format evolution.
+    */
+  final class UnitSerializerSnapshot
+        extends SimpleTypeSerializerSnapshot[Unit](
+    new Supplier[TypeSerializer[Unit]] {
+      override def get() = new UnitSerializer()
+    })
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
index 835197f..0e09b2e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@@ -19,7 +19,9 @@
 package org.apache.flink.streaming.api.windowing.windows;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -120,5 +122,23 @@ public class GlobalWindow extends Window {
 		public boolean canEqual(Object obj) {
 			return obj instanceof Serializer;
 		}
+
+		// ------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<GlobalWindow> snapshotConfiguration() {
+			return new GlobalWindowSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class GlobalWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<GlobalWindow> {
+
+			public GlobalWindowSerializerSnapshot() {
+				super(GlobalWindow.Serializer::new);
+			}
+		}
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 0e89294..a778ebf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -19,6 +19,8 @@
 package org.apache.flink.streaming.api.windowing.windows;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
@@ -188,6 +190,24 @@ public class TimeWindow extends Window {
 		public boolean canEqual(Object obj) {
 			return obj instanceof Serializer;
 		}
+
+		// ------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<TimeWindow> snapshotConfiguration() {
+			return new TimeWindowSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<TimeWindow> {
+
+			public TimeWindowSerializerSnapshot() {
+				super(Serializer::new);
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index aa8e59e..67ada5d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -24,7 +24,9 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -230,5 +232,23 @@ public class CheckpointingCustomKvStateProgram {
 		public boolean canEqual(Object obj) {
 			return obj instanceof CustomIntSerializer;
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
+			return new CustomIntSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class CustomIntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
+
+			public CustomIntSerializerSnapshot() {
+				super(() -> INSTANCE);
+			}
+		}
 	}
 }