You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/04 12:02:12 UTC

[flink] branch release-1.14 updated (9365510 -> b9f7a4f)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9365510  [FLINK-25827][task] Fix potential memory leak in SourceOperator when using CompletableFuture.anyOf
     new 150f768  [FLINK-21752] NullPointerException on restore in PojoSerializer
     new b9f7a4f  [FLINK-21752] Add a call to TypeSerializer#duplicate in migration tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../api/java/typeutils/runtime/PojoSerializer.java | 34 ++++++++++++++++------
 .../typeutils/TypeSerializerUpgradeTestBase.java   |  8 ++++-
 2 files changed, 32 insertions(+), 10 deletions(-)

[flink] 01/02: [FLINK-21752] NullPointerException on restore in PojoSerializer

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 150f768573cb526a2c67123da61f64b199d0ea7a
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Feb 3 13:22:37 2022 +0100

    [FLINK-21752] NullPointerException on restore in PojoSerializer
    
    In order to support Pojo schema migration, we added a new ctor to the
    PojoSerializer, which uses data extracted from a snapshot. However the
    duplicate method still used the old ctor which tries to recreate parts
    of the data from the current context.
    
    We should use the same ctor as we use for schema migration in the
    duplicate methods. We must make sure though all serializers are properly
    duplicated.
---
 .../api/java/typeutils/runtime/PojoSerializer.java | 34 ++++++++++++++++------
 1 file changed, 25 insertions(+), 9 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 7182f68..30f4880 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -50,6 +50,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -163,12 +164,30 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
     @Override
     public PojoSerializer<T> duplicate() {
+        TypeSerializer<Object>[] duplicateFieldSerializers = duplicateSerializers(fieldSerializers);
+        TypeSerializer<Object>[] duplicateRegisteredSerializers =
+                duplicateSerializers(registeredSerializers);
+
+        return new PojoSerializer<>(
+                clazz,
+                fields,
+                duplicateFieldSerializers,
+                new LinkedHashMap<>(registeredClasses),
+                duplicateRegisteredSerializers,
+                subclassSerializerCache.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(Map.Entry::getKey, e -> e.getValue().duplicate())),
+                executionConfig);
+    }
+
+    @SuppressWarnings("unchecked")
+    private TypeSerializer<Object>[] duplicateSerializers(TypeSerializer<?>[] serializers) {
         boolean stateful = false;
-        TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length];
+        TypeSerializer<?>[] duplicateSerializers = new TypeSerializer[serializers.length];
 
-        for (int i = 0; i < fieldSerializers.length; i++) {
-            duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
-            if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
+        for (int i = 0; i < serializers.length; i++) {
+            duplicateSerializers[i] = serializers[i].duplicate();
+            if (duplicateSerializers[i] != serializers[i]) {
                 // at least one of them is stateful
                 stateful = true;
             }
@@ -176,12 +195,9 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
         if (!stateful) {
             // as a small memory optimization, we can share the same object between instances
-            duplicateFieldSerializers = fieldSerializers;
+            duplicateSerializers = serializers;
         }
-
-        // we must create a new instance, otherwise the subclassSerializerCache can create
-        // concurrency problems
-        return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, executionConfig);
+        return (TypeSerializer<Object>[]) duplicateSerializers;
     }
 
     @Override

[flink] 02/02: [FLINK-21752] Add a call to TypeSerializer#duplicate in migration tests

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b9f7a4f02e969cf98d7b3acc549b27adad234079
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Feb 3 12:25:07 2022 +0100

    [FLINK-21752] Add a call to TypeSerializer#duplicate in migration tests
    
    We should verify that a TypeSerializer even after migration can be
    safely duplicated. In order to do that we can duplicate a new serializer
    in the TypeSerializerUpgradeTestBase before we try to use for
    deserialization.
---
 .../flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
index 0dbe642..0c5ad47 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
@@ -421,7 +421,13 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
                 readAndThenWriteData(dataInput, serializer, serializer, testDataMatcher);
         TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
         TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
-        readAndThenWriteData(serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+        serializedData =
+                readAndThenWriteData(
+                        serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+
+        TypeSerializer<T> duplicateSerializer = snapshot.restoreSerializer().duplicate();
+        readAndThenWriteData(
+                serializedData, duplicateSerializer, duplicateSerializer, testDataMatcher);
     }
 
     // ------------------------------------------------------------------------------