You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/03 11:36:31 UTC

[GitHub] [flink] dawidwys commented on a diff in pull request #21201: [FLINK-29807] Drop TypeSerializerConfigSnapshot and savepoint support from Flink versions < 1.8.0

dawidwys commented on code in PR #21201:
URL: https://github.com/apache/flink/pull/21201#discussion_r1012785767


##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java:
##########
@@ -36,8 +32,6 @@
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InvalidClassException;
-import java.util.ArrayList;
-import java.util.List;

Review Comment:
   +1, I believe we should remove the class, it is used in tests only



##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerUtils.java:
##########
@@ -23,46 +23,18 @@
 /** Utilities for dealing with the {@link TypeSerializer} and the {@link TypeSerializerSnapshot}. */
 public final class TypeSerializerUtils {
 
-    /**
-     * Takes snapshots of the given serializers. In case where the snapshots are still extending the
-     * old {@code TypeSerializerConfigSnapshot} class, the snapshots are set up properly (with their
-     * originating serializer) such that the backwards compatible code paths work.
-     */
-    public static TypeSerializerSnapshot<?>[] snapshotBackwardsCompatible(
+    /** Takes snapshots of the given serializers. */
+    public static TypeSerializerSnapshot<?>[] snapshot(
             TypeSerializer<?>... originatingSerializers) {
 
         return Arrays.stream(originatingSerializers)
-                .map(TypeSerializerUtils::snapshotBackwardsCompatible)
+                .map(TypeSerializerUtils::snapshot)
                 .toArray(TypeSerializerSnapshot[]::new);
     }
 
-    /**
-     * Takes a snapshot of the given serializer. In case where the snapshot is still extending the
-     * old {@code TypeSerializerConfigSnapshot} class, the snapshot is set up properly (with its
-     * originating serializer) such that the backwards compatible code paths work.
-     */
-    public static <T> TypeSerializerSnapshot<T> snapshotBackwardsCompatible(
-            TypeSerializer<T> originatingSerializer) {
-        return configureForBackwardsCompatibility(
-                originatingSerializer.snapshotConfiguration(), originatingSerializer);
-    }
-
-    /**
-     * Utility method to bind the serializer and serializer snapshot to a common generic type
-     * variable.
-     */
-    @SuppressWarnings({"unchecked", "deprecation"})
-    private static <T> TypeSerializerSnapshot<T> configureForBackwardsCompatibility(
-            TypeSerializerSnapshot<?> snapshot, TypeSerializer<?> serializer) {
-
-        TypeSerializerSnapshot<T> typedSnapshot = (TypeSerializerSnapshot<T>) snapshot;
-        TypeSerializer<T> typedSerializer = (TypeSerializer<T>) serializer;
-
-        if (snapshot instanceof TypeSerializerConfigSnapshot) {
-            ((TypeSerializerConfigSnapshot<T>) typedSnapshot).setPriorSerializer(typedSerializer);
-        }
-
-        return typedSnapshot;
+    /** Takes a snapshot of the given serializer. */
+    public static <T> TypeSerializerSnapshot<T> snapshot(TypeSerializer<T> originatingSerializer) {
+        return originatingSerializer.snapshotConfiguration();

Review Comment:
   Is this function still helpful? Can't we just inline it?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java:
##########
@@ -65,12 +57,10 @@ public static <K, N> InternalTimersSnapshotWriter getWriterForVersion(
 
         switch (version) {
             case NO_VERSION:
-                return new InternalTimersSnapshotWriterPreVersioned<>(
-                        timersSnapshot, keySerializer, namespaceSerializer);
-
             case 1:
-                return new InternalTimersSnapshotWriterV1<>(
-                        timersSnapshot, keySerializer, namespaceSerializer);
+                throw new UnsupportedOperationException(

Review Comment:
   nit: I'd rather use `IllegalStateException` 



##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java:
##########
@@ -145,8 +140,12 @@ public void read(DataInputView in) throws IOException {
                     serializerSnapshot = deserializeV2(in, userCodeClassLoader);
                     break;
                 case 1:
-                    serializerSnapshot = deserializeV1(in, userCodeClassLoader, serializer);
-                    break;
+                    throw new IOException(
+                            String.format(
+                                    "No longer supported version [%d] for TypeSerializerSnapshot. "
+                                            + "Please migrate away from the old TypeSerializerConfigSnapshot "
+                                            + "and use Flink 1.16 for the migration",
+                                    version));

Review Comment:
   I find the way we check the incompatibility confusing atm.
   There are at least two places that throw a similar exception:
   1. `VersionedIOReadeableWritable` from `getIncompatibleVersionError`
   2. Here from the `read` method.
   
   I think 1) is actually a dead code now, as we tell in `TypeSerializerSnapshotSerializationProxy` that we support version 1.
   
   Moreover I find the method `KeyedBackendSerializationProxy#getIncompatibleVersionError` misleading. I think at least the `Ops, this should not happen...` is unnecessary. You already get a message from `VersionedIOReadableWritable` that is an unsupported version. This comment does not give us any additional context. 
   
   Lastly if you leave the `getIncompatibleVersionError` method I'd rename it something along the lines of "extra information", e.g. `getAdditionalDetailsForIncompatibleVersion`.



##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java:
##########
@@ -145,8 +140,12 @@ public void read(DataInputView in) throws IOException {
                     serializerSnapshot = deserializeV2(in, userCodeClassLoader);
                     break;
                 case 1:
-                    serializerSnapshot = deserializeV1(in, userCodeClassLoader, serializer);
-                    break;
+                    throw new IOException(
+                            String.format(
+                                    "No longer supported version [%d] for TypeSerializerSnapshot. "
+                                            + "Please migrate away from the old TypeSerializerConfigSnapshot "
+                                            + "and use Flink 1.16 for the migration",
+                                    version));

Review Comment:
   Could we also clean this class a bit? I believe `org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.TypeSerializerSnapshotSerializationProxy#serializer`:
   1. is never accessed
   2. always null (apart from tests)



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java:
##########
@@ -223,10 +167,10 @@ public static <K, N> InternalTimersSnapshotReader<K, N> getReaderForVersion(
 
         switch (version) {
             case NO_VERSION:
-                return new InternalTimersSnapshotReaderPreVersioned<>(userCodeClassLoader);
-
             case 1:
-                return new InternalTimersSnapshotReaderV1<>(userCodeClassLoader);
+                throw new UnsupportedOperationException(

Review Comment:
   `IllegalStateException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org