You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/08/08 07:58:14 UTC

[flink] branch master updated: [FLINK-13159] Fix incorrect subclass serializer reconfiguration in PojoSerializer

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 268da6a  [FLINK-13159] Fix incorrect subclass serializer reconfiguration in PojoSerializer
268da6a is described below

commit 268da6a03323b65d6297e4d2288ead39aba7d388
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Aug 8 07:34:04 2019 +0200

    [FLINK-13159] Fix incorrect subclass serializer reconfiguration in PojoSerializer
---
 .../typeutils/runtime/PojoSerializerSnapshot.java  | 22 +++++++++++++++-------
 1 file changed, 15 insertions(+), 7 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
index 9987fae..95276d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
@@ -474,13 +474,21 @@ public class PojoSerializerSnapshot<T> implements TypeSerializerSnapshot<T> {
 		Iterator<TypeSerializer<?>> serializersForPreexistingRegistrations =
 			Arrays.asList(preExistingRegistrationsCompatibility.getNestedSerializers()).iterator();
 
-		for (Map.Entry<Class<?>, TypeSerializer<?>> registration : newSubclassRegistrations.entrySet()) {
-			// new registrations should simply be appended to the subclass serializer registry with their new serializers;
-			// preexisting registrations should use the compatibility-checked serializer
-			TypeSerializer<?> newRegistration = (reconfiguredSubclassSerializerRegistry.containsKey(registration.getKey()))
-				? serializersForPreexistingRegistrations.next()
-				: registration.getValue();
-			reconfiguredSubclassSerializerRegistry.put(registration.getKey(), newRegistration);
+		// first, replace all restored serializers of subclasses that co-exist in
+		// the previous and new registrations, with the compatibility-checked serializers
+		for (Map.Entry<Class<?>, TypeSerializer<?>> oldRegistration : reconfiguredSubclassSerializerRegistry.entrySet()) {
+			if (newSubclassRegistrations.containsKey(oldRegistration.getKey())) {
+				oldRegistration.setValue(serializersForPreexistingRegistrations.next());
+			}
+		}
+
+		// then, for all new registration that did not exist before, append it to the registry simply with their
+		// new serializers
+		for (Map.Entry<Class<?>, TypeSerializer<?>> newRegistration : newSubclassRegistrations.entrySet()) {
+			TypeSerializer<?> oldRegistration = reconfiguredSubclassSerializerRegistry.get(newRegistration.getKey());
+			if (oldRegistration == null) {
+				reconfiguredSubclassSerializerRegistry.put(newRegistration.getKey(), newRegistration.getValue());
+			}
 		}
 
 		return decomposeSubclassSerializerRegistry(reconfiguredSubclassSerializerRegistry);