You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/04/19 13:16:34 UTC

[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/5880

    [FLINK-8836] Fix duplicate method in KryoSerializer to perform deep c…

    …opy of default/registered serializer instances.
    
    This method did create deep copies of registered or default serializer instances and
    as a result those serializer instances can accidentally be shared across different threads.
    
    ## Brief change log
    
    This PR fixes a problem with the `duplicate` method of `KryoSerializer`. We do now perform deep copies for default and registered serializer objects.
    
    Otherwise, if we share duplicated `KryoSerializer` instances across different threads, some of their internal serializers might be a shared instance of a stateful object. 
    
    ## Verifying this change
    
    This change added tests and can be verified by running `KryoSerializerConcurrencyTest`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (yes)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (indirectly)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? ( no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink FLINK-8836

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5880.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5880
    
----
commit 617ebd114b86d8f00d15e53dd649bb633fc717f9
Author: Stefan Richter <s....@...>
Date:   2018-04-19T13:10:07Z

    [FLINK-8836] Fix duplicate method in KryoSerializer to perform deep copy of default/registered serializer instances.
    
    This method did create deep copies of registered or default serializer instances and
    as a result those serializer instances can accidentally be shared across different threads.

----


---

[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5880#discussion_r182749960
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---
    @@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
     	 * Copy-constructor that does not copy transient fields. They will be initialized once required.
     	 */
     	protected KryoSerializer(KryoSerializer<T> toCopy) {
    -		defaultSerializers = toCopy.defaultSerializers;
    -		defaultSerializerClasses = toCopy.defaultSerializerClasses;
     
    -		kryoRegistrations = toCopy.kryoRegistrations;
    +		this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
    +		this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
    +		this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size());
    +		this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size());
    +
    +		// deep copy the serializer instances in defaultSerializers
    +		for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry :
    +			toCopy.defaultSerializers.entrySet()) {
     
    -		type = toCopy.type;
    -		if(type == null){
    -			throw new NullPointerException("Type class cannot be null.");
    +			this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
    +		}
    +
    +		// deep copy the serializer instances in kryoRegistrations
    +		for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) {
    --- End diff --
    
    One alternative approach to this loop (though I'm not sure would be better), is in the `buildKryoRegistrationsMethod` we always make a copy of the `ExecutionConfig.SerializableSerializer` when instantiating its corresponding `KryoRegistration`.
    See https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537. Here we can make a copy already when building the registrations.
    
    Then, when duplicating the `KryoSerializer`, for duplicating the registrations, this would only be a matter of calling `buildKryoRegistrations` again from the execution config because that method would handle stateful serializer registrations properly.
    IMO, this seems like a cleaner solution. What do you think?
    



---

[GitHub] flink issue #5880: [FLINK-8836] Fix duplicate method in KryoSerializer to pe...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5880
  
    Thanks for the comments, will fix the names and then merge.


---

[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5880


---

[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5880#discussion_r182777472
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java ---
    @@ -24,21 +24,71 @@
     import org.apache.flink.core.testutils.BlockerSync;
     import org.apache.flink.core.testutils.CheckedThread;
     
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.Serializer;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +import org.junit.Assert;
     import org.junit.Test;
     
     import java.io.IOException;
    +import java.io.Serializable;
     
     import static org.junit.Assert.fail;
     
     /**
      * This tests that the {@link KryoSerializer} properly fails when accessed by two threads
    - * concurrently.
    + * concurrently and that Kryo serializers are properly duplicated to use them in different threads.
      *
      * <p><b>Important:</b> This test only works if assertions are activated (-ea) on the JVM
      * when running tests.
      */
     public class KryoSerializerConcurrencyTest {
     
    +	@Test
    +	public void testDuplicateSerializerWithDefaultSerializerClass() {
    --- End diff --
    
    test names are mixed up, this and the next one should be switched


---

[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5880#discussion_r182776735
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---
    @@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
     	 * Copy-constructor that does not copy transient fields. They will be initialized once required.
     	 */
     	protected KryoSerializer(KryoSerializer<T> toCopy) {
    -		defaultSerializers = toCopy.defaultSerializers;
    -		defaultSerializerClasses = toCopy.defaultSerializerClasses;
     
    -		kryoRegistrations = toCopy.kryoRegistrations;
    +		this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
    +		this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
    +		this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size());
    +		this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size());
    +
    +		// deep copy the serializer instances in defaultSerializers
    +		for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry :
    +			toCopy.defaultSerializers.entrySet()) {
     
    -		type = toCopy.type;
    -		if(type == null){
    -			throw new NullPointerException("Type class cannot be null.");
    +			this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
    +		}
    +
    +		// deep copy the serializer instances in kryoRegistrations
    +		for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) {
    --- End diff --
    
    The problem is that we don't have the `ExecutionConfig` in the copy constructor.


---

[GitHub] flink issue #5880: [FLINK-8836] Fix duplicate method in KryoSerializer to pe...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/5880
  
    +1, Will this PR also get into 1.4.x?


---

[GitHub] flink pull request #5880: [FLINK-8836] Fix duplicate method in KryoSerialize...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5880#discussion_r182750453
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---
    @@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
     	 * Copy-constructor that does not copy transient fields. They will be initialized once required.
     	 */
     	protected KryoSerializer(KryoSerializer<T> toCopy) {
    -		defaultSerializers = toCopy.defaultSerializers;
    -		defaultSerializerClasses = toCopy.defaultSerializerClasses;
     
    -		kryoRegistrations = toCopy.kryoRegistrations;
    +		this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
    +		this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
    +		this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size());
    +		this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size());
    +
    +		// deep copy the serializer instances in defaultSerializers
    +		for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry :
    +			toCopy.defaultSerializers.entrySet()) {
     
    -		type = toCopy.type;
    -		if(type == null){
    -			throw new NullPointerException("Type class cannot be null.");
    +			this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
    +		}
    +
    +		// deep copy the serializer instances in kryoRegistrations
    +		for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) {
    --- End diff --
    
    One alternative approach to this loop (though I'm not sure would be better), is in the `buildKryoRegistrationsMethod` we always make a copy of the `ExecutionConfig.SerializableSerializer` when instantiating its corresponding `KryoRegistration`.
    See https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537. Here we can make a copy already when building the registrations.
    
    Then, when duplicating the `KryoSerializer`, for duplicating the registrations, this would only be a matter of calling `buildKryoRegistrations` again with the execution config because that method would handle stateful serializer registrations properly.
    IMO, this seems like a cleaner solution. What do you think?


---

[GitHub] flink issue #5880: [FLINK-8836] Fix duplicate method in KryoSerializer to pe...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5880
  
    Yes, +1 after that is fixed.


---

[GitHub] flink issue #5880: [FLINK-8836] Fix duplicate method in KryoSerializer to pe...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5880
  
    @sihuazhou I think it can and should also go into 1.4.
    
    @aljoscha is that a +1 once I have fixed the method names?


---