You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/08/08 07:58:14 UTC
[flink] branch master updated: [FLINK-13159] Fix incorrect subclass
serializer reconfiguration in PojoSerializer
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 268da6a [FLINK-13159] Fix incorrect subclass serializer reconfiguration in PojoSerializer
268da6a is described below
commit 268da6a03323b65d6297e4d2288ead39aba7d388
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Aug 8 07:34:04 2019 +0200
[FLINK-13159] Fix incorrect subclass serializer reconfiguration in PojoSerializer
---
.../typeutils/runtime/PojoSerializerSnapshot.java | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
index 9987fae..95276d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java
@@ -474,13 +474,21 @@ public class PojoSerializerSnapshot<T> implements TypeSerializerSnapshot<T> {
Iterator<TypeSerializer<?>> serializersForPreexistingRegistrations =
Arrays.asList(preExistingRegistrationsCompatibility.getNestedSerializers()).iterator();
- for (Map.Entry<Class<?>, TypeSerializer<?>> registration : newSubclassRegistrations.entrySet()) {
- // new registrations should simply be appended to the subclass serializer registry with their new serializers;
- // preexisting registrations should use the compatibility-checked serializer
- TypeSerializer<?> newRegistration = (reconfiguredSubclassSerializerRegistry.containsKey(registration.getKey()))
- ? serializersForPreexistingRegistrations.next()
- : registration.getValue();
- reconfiguredSubclassSerializerRegistry.put(registration.getKey(), newRegistration);
+ // first, replace all restored serializers of subclasses that co-exist in
+ // the previous and new registrations, with the compatibility-checked serializers
+ for (Map.Entry<Class<?>, TypeSerializer<?>> oldRegistration : reconfiguredSubclassSerializerRegistry.entrySet()) {
+ if (newSubclassRegistrations.containsKey(oldRegistration.getKey())) {
+ oldRegistration.setValue(serializersForPreexistingRegistrations.next());
+ }
+ }
+
+ // then, for all new registration that did not exist before, append it to the registry simply with their
+ // new serializers
+ for (Map.Entry<Class<?>, TypeSerializer<?>> newRegistration : newSubclassRegistrations.entrySet()) {
+ TypeSerializer<?> oldRegistration = reconfiguredSubclassSerializerRegistry.get(newRegistration.getKey());
+ if (oldRegistration == null) {
+ reconfiguredSubclassSerializerRegistry.put(newRegistration.getKey(), newRegistration.getValue());
+ }
}
return decomposeSubclassSerializerRegistry(reconfiguredSubclassSerializerRegistry);