You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/04/19 13:16:34 UTC
[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/5880
[FLINK-8836] Fix duplicate method in KryoSerializer to perform deep c…
…opy of default/registered serializer instances.
This method did create deep copies of registered or default serializer instances and
as a result those serializer instances can accidentally be shared across different threads.
## Brief change log
This PR fixes a problem with the `duplicate` method of `KryoSerializer`. We do now perform deep copies for default and registered serializer objects.
Otherwise, if we share duplicated `KryoSerializer` instances across different threads, some of their internal serializers might be a shared instance of a stateful object.
## Verifying this change
This change added tests and can be verified by running `KryoSerializerConcurrencyTest`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (yes)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (indirectly)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? ( no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink FLINK-8836
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5880.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5880
----
commit 617ebd114b86d8f00d15e53dd649bb633fc717f9
Author: Stefan Richter <s....@...>
Date: 2018-04-19T13:10:07Z
[FLINK-8836] Fix duplicate method in KryoSerializer to perform deep copy of default/registered serializer instances.
This method did create deep copies of registered or default serializer instances and
as a result those serializer instances can accidentally be shared across different threads.
----
---
[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5880#discussion_r182749960
--- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---
@@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
- defaultSerializers = toCopy.defaultSerializers;
- defaultSerializerClasses = toCopy.defaultSerializerClasses;
- kryoRegistrations = toCopy.kryoRegistrations;
+ this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
+ this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+ this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size());
+ this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+ // deep copy the serializer instances in defaultSerializers
+ for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry :
+ toCopy.defaultSerializers.entrySet()) {
- type = toCopy.type;
- if(type == null){
- throw new NullPointerException("Type class cannot be null.");
+ this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
+ }
+
+ // deep copy the serializer instances in kryoRegistrations
+ for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) {
--- End diff --
One alternative approach to this loop (though I'm not sure would be better), is in the `buildKryoRegistrationsMethod` we always make a copy of the `ExecutionConfig.SerializableSerializer` when instantiating its corresponding `KryoRegistration`.
See https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537. Here we can make a copy already when building the registrations.
Then, when duplicating the `KryoSerializer`, for duplicating the registrations, this would only be a matter of calling `buildKryoRegistrations` again from the execution config because that method would handle stateful serializer registrations properly.
IMO, this seems like a cleaner solution. What do you think?
---
[GitHub] flink issue #5880: [FLINK-8836] Fix duplicate method in KryoSerializer to pe...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/5880
Thanks for the comments, will fix the names and then merge.
---
[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5880
---
[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5880#discussion_r182777472
--- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java ---
@@ -24,21 +24,71 @@
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.core.testutils.CheckedThread;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.io.Serializable;
import static org.junit.Assert.fail;
/**
* This tests that the {@link KryoSerializer} properly fails when accessed by two threads
- * concurrently.
+ * concurrently and that Kryo serializers are properly duplicated to use them in different threads.
*
* <p><b>Important:</b> This test only works if assertions are activated (-ea) on the JVM
* when running tests.
*/
public class KryoSerializerConcurrencyTest {
+ @Test
+ public void testDuplicateSerializerWithDefaultSerializerClass() {
--- End diff --
test names are mixed up, this and the next one should be switched
---
[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5880#discussion_r182776735
--- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---
@@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
- defaultSerializers = toCopy.defaultSerializers;
- defaultSerializerClasses = toCopy.defaultSerializerClasses;
- kryoRegistrations = toCopy.kryoRegistrations;
+ this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
+ this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+ this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size());
+ this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+ // deep copy the serializer instances in defaultSerializers
+ for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry :
+ toCopy.defaultSerializers.entrySet()) {
- type = toCopy.type;
- if(type == null){
- throw new NullPointerException("Type class cannot be null.");
+ this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
+ }
+
+ // deep copy the serializer instances in kryoRegistrations
+ for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) {
--- End diff --
The problem is that we don't have the `ExecutionConfig` in the copy constructor.
---
[GitHub] flink issue #5880: [FLINK-8836] Fix duplicate method in KryoSerializer to pe...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/5880
+1, Will this PR also get into 1.4.x?
---
[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...
Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5880#discussion_r182750453
--- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---
@@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
- defaultSerializers = toCopy.defaultSerializers;
- defaultSerializerClasses = toCopy.defaultSerializerClasses;
- kryoRegistrations = toCopy.kryoRegistrations;
+ this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
+ this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+ this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size());
+ this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+ // deep copy the serializer instances in defaultSerializers
+ for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry :
+ toCopy.defaultSerializers.entrySet()) {
- type = toCopy.type;
- if(type == null){
- throw new NullPointerException("Type class cannot be null.");
+ this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
+ }
+
+ // deep copy the serializer instances in kryoRegistrations
+ for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) {
--- End diff --
One alternative approach to this loop (though I'm not sure would be better), is in the `buildKryoRegistrationsMethod` we always make a copy of the `ExecutionConfig.SerializableSerializer` when instantiating its corresponding `KryoRegistration`.
See https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537. Here we can make a copy already when building the registrations.
Then, when duplicating the `KryoSerializer`, for duplicating the registrations, this would only be a matter of calling `buildKryoRegistrations` again with the execution config because that method would handle stateful serializer registrations properly.
IMO, this seems like a cleaner solution. What do you think?
---
[GitHub] flink issue #5880: [FLINK-8836] Fix duplicate method in KryoSerializer to pe...
Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/5880
Yes, +1 after that is fixed.
---
[GitHub] flink issue #5880: [FLINK-8836] Fix duplicate method in KryoSerializer to pe...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/5880
@sihuazhou I think it can and should also go into 1.4.
@aljoscha is that a +1 once I have fixed the method names?
---