You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/04 12:03:01 UTC

[flink] branch release-1.13 updated (7012130 -> 695372c)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7012130  [FLINK-25653][network] Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock
     new 3d2329e  [FLINK-21752] NullPointerException on restore in PojoSerializer
     new 695372c  [FLINK-21752] Add a call to TypeSerializer#duplicate in migration tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/java/typeutils/runtime/PojoSerializer.java | 34 ++++++++++++++++------
 .../typeutils/TypeSerializerUpgradeTestBase.java   |  7 ++++-
 2 files changed, 31 insertions(+), 10 deletions(-)

[flink] 01/02: [FLINK-21752] NullPointerException on restore in PojoSerializer

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d2329e9744eb89e31669f2096c4bce7dde7898a
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Feb 3 13:22:37 2022 +0100

    [FLINK-21752] NullPointerException on restore in PojoSerializer
    
    In order to support Pojo schema migration, we added a new ctor to the
    PojoSerializer, which uses data extracted from a snapshot. However the
    duplicate method still used the old ctor which tries to recreate parts
    of the data from the current context.
    
    We should use the same ctor as we use for schema migration in the
    duplicate methods. We must make sure though all serializers are properly
    duplicated.
---
 .../api/java/typeutils/runtime/PojoSerializer.java | 34 ++++++++++++++++------
 1 file changed, 25 insertions(+), 9 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 7182f68..30f4880 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -50,6 +50,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -163,12 +164,30 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
     @Override
     public PojoSerializer<T> duplicate() {
+        TypeSerializer<Object>[] duplicateFieldSerializers = duplicateSerializers(fieldSerializers);
+        TypeSerializer<Object>[] duplicateRegisteredSerializers =
+                duplicateSerializers(registeredSerializers);
+
+        return new PojoSerializer<>(
+                clazz,
+                fields,
+                duplicateFieldSerializers,
+                new LinkedHashMap<>(registeredClasses),
+                duplicateRegisteredSerializers,
+                subclassSerializerCache.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(Map.Entry::getKey, e -> e.getValue().duplicate())),
+                executionConfig);
+    }
+
+    @SuppressWarnings("unchecked")
+    private TypeSerializer<Object>[] duplicateSerializers(TypeSerializer<?>[] serializers) {
         boolean stateful = false;
-        TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length];
+        TypeSerializer<?>[] duplicateSerializers = new TypeSerializer[serializers.length];
 
-        for (int i = 0; i < fieldSerializers.length; i++) {
-            duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
-            if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
+        for (int i = 0; i < serializers.length; i++) {
+            duplicateSerializers[i] = serializers[i].duplicate();
+            if (duplicateSerializers[i] != serializers[i]) {
                 // at least one of them is stateful
                 stateful = true;
             }
@@ -176,12 +195,9 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
         if (!stateful) {
             // as a small memory optimization, we can share the same object between instances
-            duplicateFieldSerializers = fieldSerializers;
+            duplicateSerializers = serializers;
         }
-
-        // we must create a new instance, otherwise the subclassSerializerCache can create
-        // concurrency problems
-        return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, executionConfig);
+        return (TypeSerializer<Object>[]) duplicateSerializers;
     }
 
     @Override

[flink] 02/02: [FLINK-21752] Add a call to TypeSerializer#duplicate in migration tests

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 695372cd0a5bece795d1248f32e46ec9748a40e8
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Feb 3 12:25:07 2022 +0100

    [FLINK-21752] Add a call to TypeSerializer#duplicate in migration tests
    
    We should verify that a TypeSerializer even after migration can be
    safely duplicated. In order to do that we can duplicate a new serializer
    in the TypeSerializerUpgradeTestBase before we try to use for
    deserialization.
---
 .../flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java  | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
index 9751e2d..f319528 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
@@ -431,8 +431,13 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
         if (!isRestoreSerializer) {
             TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
             TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
+            serializedData =
+                    readAndThenWriteData(
+                            serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+
+            TypeSerializer<T> duplicateSerializer = snapshot.restoreSerializer().duplicate();
             readAndThenWriteData(
-                    serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+                    serializedData, duplicateSerializer, duplicateSerializer, testDataMatcher);
         }
     }