You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2022/02/04 12:03:01 UTC
[flink] branch release-1.13 updated (7012130 -> 695372c)
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.
from 7012130 [FLINK-25653][network] Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock
new 3d2329e [FLINK-21752] NullPointerException on restore in PojoSerializer
new 695372c [FLINK-21752] Add a call to TypeSerializer#duplicate in migration tests
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../api/java/typeutils/runtime/PojoSerializer.java | 34 ++++++++++++++++------
.../typeutils/TypeSerializerUpgradeTestBase.java | 7 ++++-
2 files changed, 31 insertions(+), 10 deletions(-)
[flink] 01/02: [FLINK-21752] NullPointerException on restore in PojoSerializer
Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3d2329e9744eb89e31669f2096c4bce7dde7898a
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Feb 3 13:22:37 2022 +0100
[FLINK-21752] NullPointerException on restore in PojoSerializer
In order to support Pojo schema migration, we added a new ctor to the
PojoSerializer, which uses data extracted from a snapshot. However the
duplicate method still used the old ctor which tries to recreate parts
of the data from the current context.
We should use the same ctor as we use for schema migration in the
duplicate methods. We must make sure though all serializers are properly
duplicated.
---
.../api/java/typeutils/runtime/PojoSerializer.java | 34 ++++++++++++++++------
1 file changed, 25 insertions(+), 9 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 7182f68..30f4880 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -50,6 +50,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -163,12 +164,30 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
@Override
public PojoSerializer<T> duplicate() {
+ TypeSerializer<Object>[] duplicateFieldSerializers = duplicateSerializers(fieldSerializers);
+ TypeSerializer<Object>[] duplicateRegisteredSerializers =
+ duplicateSerializers(registeredSerializers);
+
+ return new PojoSerializer<>(
+ clazz,
+ fields,
+ duplicateFieldSerializers,
+ new LinkedHashMap<>(registeredClasses),
+ duplicateRegisteredSerializers,
+ subclassSerializerCache.entrySet().stream()
+ .collect(
+ Collectors.toMap(Map.Entry::getKey, e -> e.getValue().duplicate())),
+ executionConfig);
+ }
+
+ @SuppressWarnings("unchecked")
+ private TypeSerializer<Object>[] duplicateSerializers(TypeSerializer<?>[] serializers) {
boolean stateful = false;
- TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length];
+ TypeSerializer<?>[] duplicateSerializers = new TypeSerializer[serializers.length];
- for (int i = 0; i < fieldSerializers.length; i++) {
- duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
- if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
+ for (int i = 0; i < serializers.length; i++) {
+ duplicateSerializers[i] = serializers[i].duplicate();
+ if (duplicateSerializers[i] != serializers[i]) {
// at least one of them is stateful
stateful = true;
}
@@ -176,12 +195,9 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
if (!stateful) {
// as a small memory optimization, we can share the same object between instances
- duplicateFieldSerializers = fieldSerializers;
+ duplicateSerializers = serializers;
}
-
- // we must create a new instance, otherwise the subclassSerializerCache can create
- // concurrency problems
- return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, executionConfig);
+ return (TypeSerializer<Object>[]) duplicateSerializers;
}
@Override
[flink] 02/02: [FLINK-21752] Add a call to TypeSerializer#duplicate in migration tests
Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 695372cd0a5bece795d1248f32e46ec9748a40e8
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Feb 3 12:25:07 2022 +0100
[FLINK-21752] Add a call to TypeSerializer#duplicate in migration tests
We should verify that a TypeSerializer even after migration can be
safely duplicated. In order to do that we can duplicate a new serializer
in the TypeSerializerUpgradeTestBase before we try to use for
deserialization.
---
.../flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
index 9751e2d..f319528 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java
@@ -431,8 +431,13 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl
if (!isRestoreSerializer) {
TypeSerializerSnapshot<T> snapshot = writeAndThenReadSerializerSnapshot(serializer);
TypeSerializer<T> restoreSerializer = snapshot.restoreSerializer();
+ serializedData =
+ readAndThenWriteData(
+ serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+
+ TypeSerializer<T> duplicateSerializer = snapshot.restoreSerializer().duplicate();
readAndThenWriteData(
- serializedData, restoreSerializer, restoreSerializer, testDataMatcher);
+ serializedData, duplicateSerializer, duplicateSerializer, testDataMatcher);
}
}