You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/20 16:19:13 UTC

flink git commit: [FLINK-8836] Fix duplicate method in KryoSerializer to perform deep copy of default/registered serializer instances.

Repository: flink
Updated Branches:
  refs/heads/master 321039f78 -> 7d0bfd51e


[FLINK-8836] Fix duplicate method in KryoSerializer to perform deep copy of default/registered serializer instances.

This method did create deep copies of registered or default serializer instances and
as a result those serializer instances can accidentally be shared across different threads.

This closes #5880.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d0bfd51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d0bfd51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d0bfd51

Branch: refs/heads/master
Commit: 7d0bfd51e967eb1eb2c2869d79cb7cd13b8366dd
Parents: 321039f
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Apr 19 15:10:07 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 20 18:18:30 2018 +0200

----------------------------------------------------------------------
 .../typeutils/runtime/kryo/KryoSerializer.java  | 60 ++++++++++---
 .../kryo/KryoSerializerConcurrencyTest.java     | 89 +++++++++++++++++++-
 2 files changed, 135 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d0bfd51/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 7c97c5c..d8158aa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
@@ -38,9 +32,15 @@ import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
 import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.commons.lang3.exception.CloneFailedException;
 import org.objenesis.strategy.StdInstantiatorStrategy;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -140,14 +140,37 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	 * Copy-constructor that does not copy transient fields. They will be initialized once required.
 	 */
 	protected KryoSerializer(KryoSerializer<T> toCopy) {
-		defaultSerializers = toCopy.defaultSerializers;
-		defaultSerializerClasses = toCopy.defaultSerializerClasses;
 
-		kryoRegistrations = toCopy.kryoRegistrations;
+		this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
+		this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+		this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size());
+		this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+		// deep copy the serializer instances in defaultSerializers
+		for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry :
+			toCopy.defaultSerializers.entrySet()) {
 
-		type = toCopy.type;
-		if(type == null){
-			throw new NullPointerException("Type class cannot be null.");
+			this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
+		}
+
+		// deep copy the serializer instances in kryoRegistrations
+		for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) {
+
+			KryoRegistration kryoRegistration = entry.getValue();
+
+			if (kryoRegistration.getSerializerDefinitionType() == KryoRegistration.SerializerDefinitionType.INSTANCE) {
+
+				ExecutionConfig.SerializableSerializer<? extends Serializer<?>> serializerInstance =
+					kryoRegistration.getSerializableSerializerInstance();
+
+				if (serializerInstance != null) {
+					kryoRegistration = new KryoRegistration(
+						kryoRegistration.getRegisteredClass(),
+						deepCopySerializer(serializerInstance));
+				}
+			}
+
+			this.kryoRegistrations.put(entry.getKey(), kryoRegistration);
 		}
 	}
 
@@ -568,6 +591,17 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 		}
 	}
 
+	private ExecutionConfig.SerializableSerializer<? extends Serializer<?>> deepCopySerializer(
+		ExecutionConfig.SerializableSerializer<? extends Serializer<?>> original) {
+		try {
+			return InstantiationUtil.clone(original, Thread.currentThread().getContextClassLoader());
+		} catch (IOException | ClassNotFoundException ex) {
+			throw new CloneFailedException(
+				"Could not clone serializer instance of class " + original.getClass(),
+				ex);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	// For testing
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/7d0bfd51/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
index ca81fd4..3680773 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
@@ -24,15 +24,21 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.testutils.BlockerSync;
 import org.apache.flink.core.testutils.CheckedThread;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 import static org.junit.Assert.fail;
 
 /**
  * This tests that the {@link KryoSerializer} properly fails when accessed by two threads
- * concurrently.
+ * concurrently and that Kryo serializers are properly duplicated to use them in different threads.
  *
  * <p><b>Important:</b> This test only works if assertions are activated (-ea) on the JVM
  * when running tests.
@@ -40,6 +46,50 @@ import static org.junit.Assert.fail;
 public class KryoSerializerConcurrencyTest {
 
 	@Test
+	public void testDuplicateSerializerWithDefaultSerializerClass() {
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.addDefaultKryoSerializer(WrappedString.class, TestSerializer.class);
+		runDuplicateSerializerTest(executionConfig);
+	}
+
+	@Test
+	public void testDuplicateSerializerWithDefaultSerializerInstance() {
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.addDefaultKryoSerializer(WrappedString.class, new TestSerializer());
+		runDuplicateSerializerTest(executionConfig);
+	}
+
+	@Test
+	public void testDuplicateSerializerWithRegisteredSerializerClass() {
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.registerTypeWithKryoSerializer(WrappedString.class, TestSerializer.class);
+		runDuplicateSerializerTest(executionConfig);
+	}
+
+	@Test
+	public void testDuplicateSerializerWithRegisteredSerializerInstance() {
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.registerTypeWithKryoSerializer(WrappedString.class, new TestSerializer());
+		runDuplicateSerializerTest(executionConfig);
+	}
+
+	private void runDuplicateSerializerTest(ExecutionConfig executionConfig) {
+		final KryoSerializer<WrappedString> original = new KryoSerializer<>(WrappedString.class, executionConfig);
+		final KryoSerializer<WrappedString> duplicate = original.duplicate();
+
+		WrappedString testString = new WrappedString("test");
+
+		String copyWithOriginal = original.copy(testString).content;
+		String copyWithDuplicate = duplicate.copy(testString).content;
+
+		Assert.assertTrue(copyWithOriginal.startsWith(testString.content));
+		Assert.assertTrue(copyWithDuplicate.startsWith(testString.content));
+
+		// check that both serializer instances have appended a different identity hash
+		Assert.assertNotEquals(copyWithOriginal, copyWithDuplicate);
+	}
+
+	@Test
 	public void testConcurrentUseOfSerializer() throws Exception {
 		final KryoSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
 
@@ -92,4 +142,41 @@ public class KryoSerializerConcurrencyTest {
 			blocker.blockNonInterruptible();
 		}
 	}
+
+	/**
+	 * A test class that wraps a string.
+	 */
+	public static class WrappedString {
+
+		private final String content;
+
+		WrappedString(String content) {
+			this.content = content;
+		}
+
+		@Override
+		public String toString() {
+			return "WrappedString{" +
+				"content='" + content + '\'' +
+				'}';
+		}
+	}
+
+	/**
+	 * A test serializer for {@link WrappedString} that appends its identity hash.
+	 */
+	public static class TestSerializer extends Serializer<WrappedString> implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void write(Kryo kryo, Output output, WrappedString object) {
+			output.writeString(object.content);
+		}
+
+		@Override
+		public WrappedString read(Kryo kryo, Input input, Class<WrappedString> type) {
+			return new WrappedString(input.readString() + " " + System.identityHashCode(this));
+		}
+	}
 }