You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/23 09:22:46 UTC
[flink] 01/05: [FLINK-11328] [core] Upgrade parameterless /
singleton serializers to use new serialization compatibility APIs
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit edf6d59d315fc0f88d35d6686b39891864537620
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Jan 14 16:46:48 2019 +0100
[FLINK-11328] [core] Upgrade parameterless / singleton serializers to use new serialization compatibility APIs
---
.../connectors/kafka/FlinkKafkaProducer011.java | 36 +++++++++
.../connectors/kafka/FlinkKafkaProducer.java | 38 ++++++++++
.../ParameterlessTypeSerializerConfig.java | 4 +
.../typeutils/SimpleTypeSerializerSnapshot.java | 88 +++++-----------------
.../common/typeutils/base/BigDecSerializer.java | 3 +-
.../common/typeutils/base/BigIntSerializer.java | 3 +-
.../common/typeutils/base/BooleanSerializer.java | 3 +-
.../typeutils/base/BooleanValueSerializer.java | 3 +-
.../api/common/typeutils/base/ByteSerializer.java | 3 +-
.../common/typeutils/base/ByteValueSerializer.java | 3 +-
.../api/common/typeutils/base/CharSerializer.java | 3 +-
.../common/typeutils/base/CharValueSerializer.java | 3 +-
.../api/common/typeutils/base/DateSerializer.java | 3 +-
.../common/typeutils/base/DoubleSerializer.java | 3 +-
.../typeutils/base/DoubleValueSerializer.java | 3 +-
.../api/common/typeutils/base/FloatSerializer.java | 3 +-
.../typeutils/base/FloatValueSerializer.java | 3 +-
.../common/typeutils/base/InstantSerializer.java | 3 +-
.../api/common/typeutils/base/IntSerializer.java | 3 +-
.../common/typeutils/base/IntValueSerializer.java | 3 +-
.../api/common/typeutils/base/LongSerializer.java | 3 +-
.../common/typeutils/base/LongValueSerializer.java | 3 +-
.../common/typeutils/base/NullValueSerializer.java | 3 +-
.../api/common/typeutils/base/ShortSerializer.java | 3 +-
.../typeutils/base/ShortValueSerializer.java | 3 +-
.../common/typeutils/base/SqlDateSerializer.java | 3 +-
.../common/typeutils/base/SqlTimeSerializer.java | 3 +-
.../typeutils/base/SqlTimestampSerializer.java | 3 +-
.../common/typeutils/base/StringSerializer.java | 3 +-
.../typeutils/base/StringValueSerializer.java | 3 +-
.../typeutils/base/TypeSerializerSingleton.java | 12 ++-
.../api/common/typeutils/base/VoidSerializer.java | 3 +-
.../array/BooleanPrimitiveArraySerializer.java | 3 +-
.../base/array/BytePrimitiveArraySerializer.java | 3 +-
.../base/array/CharPrimitiveArraySerializer.java | 3 +-
.../base/array/DoublePrimitiveArraySerializer.java | 3 +-
.../base/array/FloatPrimitiveArraySerializer.java | 3 +-
.../base/array/IntPrimitiveArraySerializer.java | 3 +-
.../base/array/LongPrimitiveArraySerializer.java | 3 +-
.../base/array/ShortPrimitiveArraySerializer.java | 3 +-
.../base/array/StringArraySerializer.java | 3 +-
.../java/org/apache/flink/cep/nfa/DeweyNumber.java | 20 +++++
.../apache/flink/cep/nfa/NFAStateSerializer.java | 19 +++++
.../apache/flink/cep/nfa/sharedbuffer/EventId.java | 20 +++++
.../apache/flink/cep/nfa/sharedbuffer/NodeId.java | 19 +++++
.../cep/nfa/sharedbuffer/SharedBufferEdge.java | 20 +++++
.../cep/nfa/sharedbuffer/SharedBufferNode.java | 20 +++++
.../transform/LongValueWithProperHashCode.java | 21 +++++-
.../types/valuearray/ByteValueArraySerializer.java | 20 ++++-
.../types/valuearray/CharValueArraySerializer.java | 21 +++++-
.../valuearray/DoubleValueArraySerializer.java | 21 +++++-
.../valuearray/FloatValueArraySerializer.java | 21 +++++-
.../types/valuearray/IntValueArraySerializer.java | 21 +++++-
.../types/valuearray/LongValueArraySerializer.java | 21 +++++-
.../types/valuearray/NullValueArraySerializer.java | 20 +++++
.../valuearray/ShortValueArraySerializer.java | 21 +++++-
.../valuearray/StringValueArraySerializer.java | 21 +++++-
.../client/VoidNamespaceSerializer.java | 20 +++++
.../apache/flink/runtime/state/JavaSerializer.java | 20 +++++
.../runtime/state/VoidNamespaceSerializer.java | 20 +++++
.../flink/api/scala/typeutils/UnitSerializer.scala | 27 +++++++
.../api/windowing/windows/GlobalWindow.java | 20 +++++
.../api/windowing/windows/TimeWindow.java | 20 +++++
.../jar/CheckpointingCustomKvStateProgram.java | 20 +++++
64 files changed, 610 insertions(+), 149 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 3e7cf2b..129ed66 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
@@ -1230,6 +1232,23 @@ public class FlinkKafkaProducer011<IN>
public boolean canEqual(Object obj) {
return obj instanceof TransactionStateSerializer;
}
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<KafkaTransactionState> snapshotConfiguration() {
+ return new TransactionStateSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class TransactionStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionState> {
+ public TransactionStateSerializerSnapshot() {
+ super(TransactionStateSerializer::new);
+ }
+ }
}
/**
@@ -1312,6 +1331,23 @@ public class FlinkKafkaProducer011<IN>
public boolean canEqual(Object obj) {
return obj instanceof ContextStateSerializer;
}
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
+ return new ContextStateSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class ContextStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
+ public ContextStateSerializerSnapshot() {
+ super(ContextStateSerializer::new);
+ }
+ }
}
/**
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 10e8ef1..15b5b9f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
@@ -1236,6 +1238,24 @@ public class FlinkKafkaProducer<IN>
public boolean canEqual(Object obj) {
return obj instanceof FlinkKafkaProducer.TransactionStateSerializer;
}
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState> snapshotConfiguration() {
+ return new TransactionStateSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class TransactionStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState> {
+
+ public TransactionStateSerializerSnapshot() {
+ super(TransactionStateSerializer::new);
+ }
+ }
}
/**
@@ -1318,6 +1338,24 @@ public class FlinkKafkaProducer<IN>
public boolean canEqual(Object obj) {
return obj instanceof FlinkKafkaProducer.ContextStateSerializer;
}
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
+ return new ContextStateSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class ContextStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
+
+ public ContextStateSerializerSnapshot() {
+ super(ContextStateSerializer::new);
+ }
+ }
}
/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
index 6fc6d17..29da90a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
@@ -27,8 +27,12 @@ import java.io.IOException;
/**
* A base class for {@link TypeSerializerConfigSnapshot}s that do not have any parameters.
+ *
+ * @deprecated this snapshot class is no longer used by any serializers, and is maintained only
+ * for backward compatibility reasons. It is fully replaced by {@link SimpleTypeSerializerSnapshot}.
*/
@Internal
+@Deprecated
public final class ParameterlessTypeSerializerConfig<T> extends TypeSerializerConfigSnapshot<T> {
private static final int VERSION = 1;
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java
index 03fc1cb..dca34e9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java
@@ -21,15 +21,13 @@ package org.apache.flink.api.common.typeutils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.function.Supplier;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* A simple base class for TypeSerializerSnapshots, for serializers that have no
@@ -46,24 +44,18 @@ public abstract class SimpleTypeSerializerSnapshot<T> implements TypeSerializerS
* backwards compatible code paths in case we decide to make this snapshot backwards compatible with
* the {@link ParameterlessTypeSerializerConfig}.
*/
- private static final int CURRENT_VERSION = 2;
+ private static final int CURRENT_VERSION = 3;
/** The class of the serializer for this snapshot.
* The field is null if the serializer was created for read and has not been read, yet. */
- @Nullable
- private Class<? extends TypeSerializer<T>> serializerClass;
-
- /**
- * Default constructor for instantiation on restore (reading the snapshot).
- */
- @SuppressWarnings("unused")
- public SimpleTypeSerializerSnapshot() {}
+ @Nonnull
+ private Supplier<? extends TypeSerializer<T>> serializerSupplier;
/**
* Constructor to create snapshot from serializer (writing the snapshot).
*/
- public SimpleTypeSerializerSnapshot(@Nonnull Class<? extends TypeSerializer<T>> serializerClass) {
- this.serializerClass = checkNotNull(serializerClass);
+ public SimpleTypeSerializerSnapshot(@Nonnull Supplier<? extends TypeSerializer<T>> serializerSupplier) {
+ this.serializerSupplier = checkNotNull(serializerSupplier);
}
// ------------------------------------------------------------------------
@@ -77,42 +69,39 @@ public abstract class SimpleTypeSerializerSnapshot<T> implements TypeSerializerS
@Override
public TypeSerializer<T> restoreSerializer() {
- checkState(serializerClass != null);
- return InstantiationUtil.instantiate(serializerClass);
+ return serializerSupplier.get();
}
@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
- checkState(serializerClass != null);
- return newSerializer.getClass() == serializerClass ?
+ return newSerializer.getClass() == serializerSupplier.get().getClass() ?
TypeSerializerSchemaCompatibility.compatibleAsIs() :
TypeSerializerSchemaCompatibility.incompatible();
}
@Override
public void writeSnapshot(DataOutputView out) throws IOException {
- checkState(serializerClass != null);
- out.writeUTF(serializerClass.getName());
+ //
}
@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) throws IOException {
switch (readVersion) {
- case 2:
- read(in, classLoader);
+ case 3: {
break;
- default:
+ }
+ case 2: {
+ // we don't need the classname any more; read and drop to maintain compatibility
+ in.readUTF();
+ break;
+ }
+ default: {
throw new IOException("Unrecognized version: " + readVersion);
+ }
}
}
- private void read(DataInputView in, ClassLoader classLoader) throws IOException {
- final String className = in.readUTF();
- final Class<?> clazz = resolveClassName(className, classLoader, false);
- this.serializerClass = cast(clazz);
- }
-
// ------------------------------------------------------------------------
// standard utilities
// ------------------------------------------------------------------------
@@ -131,45 +120,4 @@ public abstract class SimpleTypeSerializerSnapshot<T> implements TypeSerializerS
public String toString() {
return getClass().getName();
}
-
- // ------------------------------------------------------------------------
- // utilities
- // ------------------------------------------------------------------------
-
- private static Class<?> resolveClassName(String className, ClassLoader cl, boolean allowCanonicalName) throws IOException {
- try {
- return Class.forName(className, false, cl);
- }
- catch (ClassNotFoundException e) {
- if (allowCanonicalName) {
- try {
- return Class.forName(guessClassNameFromCanonical(className), false, cl);
- }
- catch (ClassNotFoundException ignored) {}
- }
-
- // throw with original ClassNotFoundException
- throw new IOException(
- "Failed to read SimpleTypeSerializerSnapshot: Serializer class not found: " + className, e);
- }
- }
-
- @SuppressWarnings("unchecked")
- private static <T> Class<? extends TypeSerializer<T>> cast(Class<?> clazz) throws IOException {
- if (!TypeSerializer.class.isAssignableFrom(clazz)) {
- throw new IOException("Failed to read SimpleTypeSerializerSnapshot. " +
- "Serializer class name leads to a class that is not a TypeSerializer: " + clazz.getName());
- }
-
- return (Class<? extends TypeSerializer<T>>) clazz;
- }
-
- static String guessClassNameFromCanonical(String className) {
- int lastDot = className.lastIndexOf('.');
- if (lastDot > 0 && lastDot < className.length() - 1) {
- return className.substring(0, lastDot) + '$' + className.substring(lastDot + 1);
- } else {
- return className;
- }
- }
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
index ed5a3a0..7e4e97f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
@@ -151,10 +151,11 @@ public final class BigDecSerializer extends TypeSerializerSingleton<BigDecimal>
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class BigDecSerializerSnapshot extends SimpleTypeSerializerSnapshot<BigDecimal> {
public BigDecSerializerSnapshot() {
- super(BigDecSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
index b4c3a19..ef4358e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
@@ -156,10 +156,11 @@ public final class BigIntSerializer extends TypeSerializerSingleton<BigInteger>
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class BigIntSerializerSnapshot extends SimpleTypeSerializerSnapshot<BigInteger> {
public BigIntSerializerSnapshot() {
- super(BigIntSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index 21c558b..84056e5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -99,10 +99,11 @@ public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class BooleanSerializerSnapshot extends SimpleTypeSerializerSnapshot<Boolean> {
public BooleanSerializerSnapshot() {
- super(BooleanSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index e66fe06..3e497ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -98,10 +98,11 @@ public final class BooleanValueSerializer extends TypeSerializerSingleton<Boolea
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class BooleanValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<BooleanValue> {
public BooleanValueSerializerSnapshot() {
- super(BooleanValueSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index ab9061f..5fdd9b6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -99,10 +99,11 @@ public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class ByteSerializerSnapshot extends SimpleTypeSerializerSnapshot<Byte> {
public ByteSerializerSnapshot() {
- super(ByteSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index e7d92bb..6f0cb76 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -96,10 +96,11 @@ public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class ByteValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<ByteValue> {
public ByteValueSerializerSnapshot() {
- super(ByteValueSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index 7805d54..cee0d0d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -99,10 +99,11 @@ public final class CharSerializer extends TypeSerializerSingleton<Character> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class CharSerializerSnapshot extends SimpleTypeSerializerSnapshot<Character> {
public CharSerializerSnapshot() {
- super(CharSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 697513a..aa580fe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -96,10 +96,11 @@ public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class CharValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<CharValue> {
public CharValueSerializerSnapshot() {
- super(CharValueSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
index 21c8432..0675e22 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -114,10 +114,11 @@ public final class DateSerializer extends TypeSerializerSingleton<Date> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class DateSerializerSnapshot extends SimpleTypeSerializerSnapshot<Date> {
public DateSerializerSnapshot() {
- super(DateSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index 447c548..cd4854b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -99,10 +99,11 @@ public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class DoubleSerializerSnapshot extends SimpleTypeSerializerSnapshot<Double> {
public DoubleSerializerSnapshot() {
- super(DoubleSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index 27b9fe2..c0c6a27 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -96,10 +96,11 @@ public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleV
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class DoubleValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<DoubleValue> {
public DoubleValueSerializerSnapshot() {
- super(DoubleValueSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
index b88fbaf..6da6f47 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
@@ -99,10 +99,11 @@ public final class FloatSerializer extends TypeSerializerSingleton<Float> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class FloatSerializerSnapshot extends SimpleTypeSerializerSnapshot<Float> {
public FloatSerializerSnapshot() {
- super(FloatSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
index f5292ce..1bfc87d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
@@ -96,10 +96,11 @@ public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class FloatValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<FloatValue> {
public FloatValueSerializerSnapshot() {
- super(FloatValueSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
index d39aba2..17f1b7a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
@@ -115,10 +115,11 @@ public final class InstantSerializer extends TypeSerializerSingleton<Instant> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class InstantSerializerSnapshot extends SimpleTypeSerializerSnapshot<Instant> {
public InstantSerializerSnapshot() {
- super(InstantSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
index 0c87786..6d0d407 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
@@ -101,8 +101,9 @@ public final class IntSerializer extends TypeSerializerSingleton<Integer> {
*/
public static final class IntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
+ @SuppressWarnings("WeakerAccess")
public IntSerializerSnapshot() {
- super(IntSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
index fae1bb6..846df31 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
@@ -96,10 +96,11 @@ public final class IntValueSerializer extends TypeSerializerSingleton<IntValue>
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class IntValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<IntValue> {
public IntValueSerializerSnapshot() {
- super(IntValueSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
index 8100307..24cf7c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
@@ -99,10 +99,11 @@ public final class LongSerializer extends TypeSerializerSingleton<Long> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class LongSerializerSnapshot extends SimpleTypeSerializerSnapshot<Long> {
public LongSerializerSnapshot() {
- super(LongSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
index 2d1fea8..9f4b5d8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
@@ -96,10 +96,11 @@ public final class LongValueSerializer extends TypeSerializerSingleton<LongValue
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class LongValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<LongValue> {
public LongValueSerializerSnapshot() {
- super(LongValueSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
index 233cc56..48c0b57 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
@@ -92,10 +92,11 @@ public final class NullValueSerializer extends TypeSerializerSingleton<NullValue
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class NullValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<NullValue> {
public NullValueSerializerSnapshot() {
- super(NullValueSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
index d1c78ec..fa6189d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
@@ -99,10 +99,11 @@ public final class ShortSerializer extends TypeSerializerSingleton<Short> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class ShortSerializerSnapshot extends SimpleTypeSerializerSnapshot<Short> {
public ShortSerializerSnapshot() {
- super(ShortSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
index b8f815b..e667faa4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
@@ -96,10 +96,11 @@ public final class ShortValueSerializer extends TypeSerializerSingleton<ShortVal
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class ShortValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<ShortValue> {
public ShortValueSerializerSnapshot() {
- super(ShortValueSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
index 6b780a3..ab96dd9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
@@ -115,10 +115,11 @@ public final class SqlDateSerializer extends TypeSerializerSingleton<Date> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class SqlDateSerializerSnapshot extends SimpleTypeSerializerSnapshot<Date> {
public SqlDateSerializerSnapshot() {
- super(SqlDateSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
index 9218ca1..b5bf7b5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
@@ -119,10 +119,11 @@ public final class SqlTimeSerializer extends TypeSerializerSingleton<Time> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class SqlTimeSerializerSnapshot extends SimpleTypeSerializerSnapshot<Time> {
public SqlTimeSerializerSnapshot() {
- super(SqlTimeSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
index e35c78b..5d7760c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
@@ -123,10 +123,11 @@ public final class SqlTimestampSerializer extends TypeSerializerSingleton<Timest
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class SqlTimestampSerializerSnapshot extends SimpleTypeSerializerSnapshot<Timestamp> {
public SqlTimestampSerializerSnapshot() {
- super(SqlTimestampSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
index 0761779..dc67a9c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
@@ -100,10 +100,11 @@ public final class StringSerializer extends TypeSerializerSingleton<String> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class StringSerializerSnapshot extends SimpleTypeSerializerSnapshot<String> {
public StringSerializerSnapshot() {
- super(StringSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
index 9075cf4..498b0f0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
@@ -121,10 +121,11 @@ public final class StringValueSerializer extends TypeSerializerSingleton<StringV
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class StringValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<StringValue> {
public StringValueSerializerSnapshot() {
- super(StringValueSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index 4f01d76..437aa14 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@Internal
public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
@@ -53,13 +52,12 @@ public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
}
}
+ /**
+ * @deprecated this is kept around for backwards compatibility.
+ * Can only be removed when {@link ParameterlessTypeSerializerConfig} is removed.
+ */
@Override
- public TypeSerializerSnapshot<T> snapshotConfiguration() {
- // type serializer singletons should always be parameter-less
- return new ParameterlessTypeSerializerConfig<>(getSerializationFormatIdentifier());
- }
-
- @Override
+ @Deprecated
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
if (configSnapshot instanceof ParameterlessTypeSerializerConfig
&& isCompatibleSerializationFormatIdentifier(
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
index ec87e2b..d9fae48 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
@@ -100,10 +100,11 @@ public final class VoidSerializer extends TypeSerializerSingleton<Void> {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class VoidSerializerSnapshot extends SimpleTypeSerializerSnapshot<Void> {
public VoidSerializerSnapshot() {
- super(VoidSerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
index ecfd604..a8f76c4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
@@ -120,10 +120,11 @@ public final class BooleanPrimitiveArraySerializer extends TypeSerializerSinglet
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class BooleanPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<boolean[]> {
public BooleanPrimitiveArraySerializerSnapshot() {
- super(BooleanPrimitiveArraySerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
index 414ef17..803bd53 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
@@ -113,10 +113,11 @@ public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton<
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class BytePrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<byte[]> {
public BytePrimitiveArraySerializerSnapshot() {
- super(BytePrimitiveArraySerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
index f08062b..1e2357a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton<
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class CharPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<char[]> {
public CharPrimitiveArraySerializerSnapshot() {
- super(CharPrimitiveArraySerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
index a38ae66..5deddc8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleto
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class DoublePrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<double[]> {
public DoublePrimitiveArraySerializerSnapshot() {
- super(DoublePrimitiveArraySerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
index d447793..f65bcf3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class FloatPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<float[]> {
public FloatPrimitiveArraySerializerSnapshot() {
- super(FloatPrimitiveArraySerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
index cbfa04d..7923eca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class IntPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<int[]> {
public IntPrimitiveArraySerializerSnapshot() {
- super(IntPrimitiveArraySerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
index f56978c..66afb32 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton<
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class LongPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<long[]> {
public LongPrimitiveArraySerializerSnapshot() {
- super(LongPrimitiveArraySerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
index b9dd1a9..a93dd71 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class ShortPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<short[]> {
public ShortPrimitiveArraySerializerSnapshot() {
- super(ShortPrimitiveArraySerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
index 1cdf317..7c81aa6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
@@ -123,9 +123,10 @@ public final class StringArraySerializer extends TypeSerializerSingleton<String[
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
+ @SuppressWarnings("WeakerAccess")
public static final class StringArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<String[]> {
public StringArraySerializerSnapshot() {
- super(StringArraySerializer.class);
+ super(() -> INSTANCE);
}
}
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index 68e0eec..53fffce 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -18,7 +18,9 @@
package org.apache.flink.cep.nfa;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
@@ -272,5 +274,23 @@ public class DeweyNumber implements Serializable {
public int hashCode() {
return elemSerializer.hashCode();
}
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<DeweyNumber> snapshotConfiguration() {
+ return new DeweyNumberSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class DeweyNumberSerializerSnapshot extends SimpleTypeSerializerSnapshot<DeweyNumber> {
+
+ public DeweyNumberSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
}
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
index 05b6c91..ccbe25c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
@@ -18,7 +18,9 @@
package org.apache.flink.cep.nfa;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -187,4 +189,21 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
return true;
}
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<NFAState> snapshotConfiguration() {
+ return new NFAStateSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class NFAStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<NFAState> {
+
+ public NFAStateSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
index c1a6ccb..045cf38 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
@@ -18,7 +18,9 @@
package org.apache.flink.cep.nfa.sharedbuffer;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -147,5 +149,23 @@ public class EventId implements Comparable<EventId> {
public boolean canEqual(Object obj) {
return obj.getClass().equals(EventIdSerializer.class);
}
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<EventId> snapshotConfiguration() {
+ return new EventIdSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class EventIdSerializerSnapshot extends SimpleTypeSerializerSnapshot<EventId> {
+
+ public EventIdSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
}
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
index 3a13184..87dc2c3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.nfa.sharedbuffer;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -151,5 +153,22 @@ public class NodeId {
return obj.getClass().equals(NodeIdSerializer.class);
}
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<NodeId> snapshotConfiguration() {
+ return new NodeIdSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class NodeIdSerializerSnapshot extends SimpleTypeSerializerSnapshot<NodeId> {
+
+ public NodeIdSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
}
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
index c8d9021..2af92f5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.nfa.sharedbuffer;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.cep.nfa.DeweyNumber;
import org.apache.flink.core.memory.DataInputView;
@@ -122,5 +124,23 @@ public class SharedBufferEdge {
public boolean canEqual(Object obj) {
return obj.getClass().equals(SharedBufferEdgeSerializer.class);
}
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<SharedBufferEdge> snapshotConfiguration() {
+ return new SharedBufferEdgeSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class SharedBufferEdgeSerializerSnapshot extends SimpleTypeSerializerSnapshot<SharedBufferEdge> {
+
+ public SharedBufferEdgeSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
}
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
index b613625..96afdbb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.nfa.sharedbuffer;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge.SharedBufferEdgeSerializer;
@@ -116,5 +118,23 @@ public class SharedBufferNode {
public boolean canEqual(Object obj) {
return obj.getClass().equals(SharedBufferNodeSerializer.class);
}
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<SharedBufferNode> snapshotConfiguration() {
+ return new SharedBufferNodeSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class SharedBufferNodeSerializerSnapshot extends SimpleTypeSerializerSnapshot<SharedBufferNode> {
+
+ public SharedBufferNodeSerializerSnapshot() {
+ super(SharedBufferNodeSerializer::new);
+ }
+ }
}
}
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java
index 22a681c..6fc7bd0 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java
@@ -24,9 +24,10 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.LongValueComparator;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -220,10 +221,22 @@ extends LongValue {
return obj instanceof LongValueWithProperHashCodeSerializer;
}
+ // -----------------------------------------------------------------------------------
+
@Override
- protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
- return super.isCompatibleSerializationFormatIdentifier(identifier)
- || identifier.equals(LongSerializer.class.getCanonicalName());
+ public TypeSerializerSnapshot<LongValueWithProperHashCode> snapshotConfiguration() {
+ return new LongValueWithProperHashCodeSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class LongValueWithProperHashCodeSerializerSnapshot extends SimpleTypeSerializerSnapshot<LongValueWithProperHashCode> {
+
+ public LongValueWithProperHashCodeSerializerSnapshot() {
+ super(LongValueWithProperHashCodeSerializer::new);
+ }
}
}
}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java
index a49359a..c690543 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java
@@ -19,8 +19,9 @@
package org.apache.flink.graph.types.valuearray;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -85,9 +86,20 @@ public final class ByteValueArraySerializer extends TypeSerializerSingleton<Byte
return obj instanceof ByteValueArraySerializer;
}
+ // ------------------------------------------------------------------------
+
@Override
- protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
- return super.isCompatibleSerializationFormatIdentifier(identifier)
- || identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+ public TypeSerializerSnapshot<ByteValueArray> snapshotConfiguration() {
+ return new ByteValueArraySerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class ByteValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<ByteValueArray> {
+ public ByteValueArraySerializerSnapshot() {
+ super(ByteValueArraySerializer::new);
+ }
}
}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java
index 4d97cdb..0844986 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java
@@ -19,8 +19,9 @@
package org.apache.flink.graph.types.valuearray;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -85,9 +86,21 @@ public final class CharValueArraySerializer extends TypeSerializerSingleton<Char
return obj instanceof CharValueArraySerializer;
}
+ // -----------------------------------------------------------------------------------
+
@Override
- protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
- return super.isCompatibleSerializationFormatIdentifier(identifier)
- || identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+ public TypeSerializerSnapshot<CharValueArray> snapshotConfiguration() {
+ return new CharValueArraySerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class CharValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<CharValueArray> {
+
+ public CharValueArraySerializerSnapshot() {
+ super(CharValueArraySerializer::new);
+ }
}
}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java
index 5808946..be683b5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java
@@ -19,8 +19,9 @@
package org.apache.flink.graph.types.valuearray;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -85,9 +86,21 @@ public final class DoubleValueArraySerializer extends TypeSerializerSingleton<Do
return obj instanceof DoubleValueArraySerializer;
}
+ // -----------------------------------------------------------------------------------
+
@Override
- protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
- return super.isCompatibleSerializationFormatIdentifier(identifier)
- || identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+ public TypeSerializerSnapshot<DoubleValueArray> snapshotConfiguration() {
+ return new DoubleValueArraySerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class DoubleValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<DoubleValueArray> {
+
+ public DoubleValueArraySerializerSnapshot() {
+ super(DoubleValueArraySerializer::new);
+ }
}
}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java
index fc78fd2..0fb0f61 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java
@@ -19,8 +19,9 @@
package org.apache.flink.graph.types.valuearray;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -85,9 +86,21 @@ public final class FloatValueArraySerializer extends TypeSerializerSingleton<Flo
return obj instanceof FloatValueArraySerializer;
}
+ // -----------------------------------------------------------------------------------
+
@Override
- protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
- return super.isCompatibleSerializationFormatIdentifier(identifier)
- || identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+ public TypeSerializerSnapshot<FloatValueArray> snapshotConfiguration() {
+ return new FloatValueArraySerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class FloatValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<FloatValueArray> {
+
+ public FloatValueArraySerializerSnapshot() {
+ super(FloatValueArraySerializer::new);
+ }
}
}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
index 5984122..c00c136 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
@@ -18,8 +18,9 @@
package org.apache.flink.graph.types.valuearray;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -84,9 +85,21 @@ public final class IntValueArraySerializer extends TypeSerializerSingleton<IntVa
return obj instanceof IntValueArraySerializer;
}
+ // -----------------------------------------------------------------------------------
+
@Override
- protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
- return super.isCompatibleSerializationFormatIdentifier(identifier)
- || identifier.equals(IntPrimitiveArraySerializer.class.getCanonicalName());
+ public TypeSerializerSnapshot<IntValueArray> snapshotConfiguration() {
+ return new IntValueArraySerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class IntValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<IntValueArray> {
+
+ public IntValueArraySerializerSnapshot() {
+ super(IntValueArraySerializer::new);
+ }
}
}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
index ade46ca..55a79b3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
@@ -18,8 +18,9 @@
package org.apache.flink.graph.types.valuearray;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -84,9 +85,21 @@ public final class LongValueArraySerializer extends TypeSerializerSingleton<Long
return obj instanceof LongValueArraySerializer;
}
+ // -----------------------------------------------------------------------------------
+
@Override
- protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
- return super.isCompatibleSerializationFormatIdentifier(identifier)
- || identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+ public TypeSerializerSnapshot<LongValueArray> snapshotConfiguration() {
+ return new LongValueArraySerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class LongValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<LongValueArray> {
+
+ public LongValueArraySerializerSnapshot() {
+ super(LongValueArraySerializer::new);
+ }
}
}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
index 233ed20..732bb0f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
@@ -18,6 +18,8 @@
package org.apache.flink.graph.types.valuearray;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -82,4 +84,22 @@ public final class NullValueArraySerializer extends TypeSerializerSingleton<Null
public boolean canEqual(Object obj) {
return obj instanceof NullValueArraySerializer;
}
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<NullValueArray> snapshotConfiguration() {
+ return new NullValueArraySerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class NullValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<NullValueArray> {
+
+ public NullValueArraySerializerSnapshot() {
+ super(NullValueArraySerializer::new);
+ }
+ }
}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
index bce4d81..2000799 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
@@ -19,8 +19,9 @@
package org.apache.flink.graph.types.valuearray;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -85,9 +86,21 @@ public final class ShortValueArraySerializer extends TypeSerializerSingleton<Sho
return obj instanceof ShortValueArraySerializer;
}
+ // ------------------------------------------------------------------------
+
@Override
- protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
- return super.isCompatibleSerializationFormatIdentifier(identifier)
- || identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+ public TypeSerializerSnapshot<ShortValueArray> snapshotConfiguration() {
+ return new ShortValueArraySerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class ShortValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<ShortValueArray> {
+
+ public ShortValueArraySerializerSnapshot() {
+ super(ShortValueArraySerializer::new);
+ }
}
}
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
index 6dbe0e5..d3dbe70 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
@@ -18,8 +18,9 @@
package org.apache.flink.graph.types.valuearray;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -84,9 +85,21 @@ public final class StringValueArraySerializer extends TypeSerializerSingleton<St
return obj instanceof StringValueArraySerializer;
}
+ // -----------------------------------------------------------------------------------
+
@Override
- protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
- return super.isCompatibleSerializationFormatIdentifier(identifier)
- || identifier.equals(StringArraySerializer.class.getCanonicalName());
+ public TypeSerializerSnapshot<StringValueArray> snapshotConfiguration() {
+ return new StringValueArraySerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class StringValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<StringValueArray> {
+
+ public StringValueArraySerializerSnapshot() {
+ super(StringValueArraySerializer::new);
+ }
}
}
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
index 38db705..e6a9f10 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
@@ -19,6 +19,8 @@
package org.apache.flink.queryablestate.client;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -93,4 +95,22 @@ public final class VoidNamespaceSerializer extends TypeSerializerSingleton<VoidN
public boolean canEqual(Object obj) {
return obj instanceof VoidNamespaceSerializer;
}
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<VoidNamespace> snapshotConfiguration() {
+ return new VoidNamespaceSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public class VoidNamespaceSerializerSnapshot extends SimpleTypeSerializerSnapshot<VoidNamespace> {
+
+ public VoidNamespaceSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index 7d9e888..60b5fd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.state;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
@@ -97,4 +99,22 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet
public boolean canEqual(Object obj) {
return obj instanceof JavaSerializer;
}
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<T> snapshotConfiguration() {
+ return new JavaSerializerSnapshot<>();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class JavaSerializerSnapshot<T extends Serializable> extends SimpleTypeSerializerSnapshot<T> {
+
+ public JavaSerializerSnapshot() {
+ super(JavaSerializer::new);
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
index 8b58891..c861acf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -89,4 +91,22 @@ public final class VoidNamespaceSerializer extends TypeSerializerSingleton<VoidN
public boolean canEqual(Object obj) {
return obj instanceof VoidNamespaceSerializer;
}
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<VoidNamespace> snapshotConfiguration() {
+ return new VoidNamespaceSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class VoidNamespaceSerializerSnapshot extends SimpleTypeSerializerSnapshot<VoidNamespace> {
+
+ public VoidNamespaceSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
}
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
index 32c44d2..8a49b18 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
@@ -17,8 +17,12 @@
*/
package org.apache.flink.api.scala.typeutils
+import java.util.function.Supplier
+
import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.api.scala.typeutils.UnitSerializer.UnitSerializerSnapshot
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
@Internal
@@ -58,4 +62,27 @@ class UnitSerializer extends TypeSerializerSingleton[Unit] {
override def canEqual(obj: scala.Any): Boolean = {
obj.isInstanceOf[UnitSerializer]
}
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ def snapshotConfiguration(): TypeSerializerSnapshot[Unit] = {
+ new UnitSerializerSnapshot()
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+}
+
+object UnitSerializer {
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ final class UnitSerializerSnapshot
+ extends SimpleTypeSerializerSnapshot[Unit](
+ new Supplier[TypeSerializer[Unit]] {
+ override def get() = new UnitSerializer()
+ })
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
index 835197f..0e09b2e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@@ -19,7 +19,9 @@
package org.apache.flink.streaming.api.windowing.windows;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -120,5 +122,23 @@ public class GlobalWindow extends Window {
public boolean canEqual(Object obj) {
return obj instanceof Serializer;
}
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<GlobalWindow> snapshotConfiguration() {
+ return new GlobalWindowSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class GlobalWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<GlobalWindow> {
+
+ public GlobalWindowSerializerSnapshot() {
+ super(GlobalWindow.Serializer::new);
+ }
+ }
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 0e89294..a778ebf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -19,6 +19,8 @@
package org.apache.flink.streaming.api.windowing.windows;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
@@ -188,6 +190,24 @@ public class TimeWindow extends Window {
public boolean canEqual(Object obj) {
return obj instanceof Serializer;
}
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<TimeWindow> snapshotConfiguration() {
+ return new TimeWindowSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<TimeWindow> {
+
+ public TimeWindowSerializerSnapshot() {
+ super(Serializer::new);
+ }
+ }
}
// ------------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index aa8e59e..67ada5d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -24,7 +24,9 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -230,5 +232,23 @@ public class CheckpointingCustomKvStateProgram {
public boolean canEqual(Object obj) {
return obj instanceof CustomIntSerializer;
}
+
+ // -----------------------------------------------------------------------------------
+
+ @Override
+ public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
+ return new CustomIntSerializerSnapshot();
+ }
+
+ /**
+ * Serializer configuration snapshot for compatibility and format evolution.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final class CustomIntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
+
+ public CustomIntSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
}
}