You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/20 16:19:13 UTC
flink git commit: [FLINK-8836] Fix duplicate method in KryoSerializer
to perform deep copy of default/registered serializer instances.
Repository: flink
Updated Branches:
refs/heads/master 321039f78 -> 7d0bfd51e
[FLINK-8836] Fix duplicate method in KryoSerializer to perform deep copy of default/registered serializer instances.
This method did create deep copies of registered or default serializer instances and
as a result those serializer instances can accidentally be shared across different threads.
This closes #5880.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d0bfd51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d0bfd51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d0bfd51
Branch: refs/heads/master
Commit: 7d0bfd51e967eb1eb2c2869d79cb7cd13b8366dd
Parents: 321039f
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Apr 19 15:10:07 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Apr 20 18:18:30 2018 +0200
----------------------------------------------------------------------
.../typeutils/runtime/kryo/KryoSerializer.java | 60 ++++++++++---
.../kryo/KryoSerializerConcurrencyTest.java | 89 +++++++++++++++++++-
2 files changed, 135 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7d0bfd51/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 7c97c5c..d8158aa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -18,12 +18,6 @@
package org.apache.flink.api.java.typeutils.runtime.kryo;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
@@ -38,9 +32,15 @@ import org.apache.flink.api.java.typeutils.runtime.KryoUtils;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.commons.lang3.exception.CloneFailedException;
import org.objenesis.strategy.StdInstantiatorStrategy;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,14 +140,37 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
- defaultSerializers = toCopy.defaultSerializers;
- defaultSerializerClasses = toCopy.defaultSerializerClasses;
- kryoRegistrations = toCopy.kryoRegistrations;
+ this.type = checkNotNull(toCopy.type, "Type class cannot be null.");
+ this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
+ this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size());
+ this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size());
+
+ // deep copy the serializer instances in defaultSerializers
+ for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry :
+ toCopy.defaultSerializers.entrySet()) {
- type = toCopy.type;
- if(type == null){
- throw new NullPointerException("Type class cannot be null.");
+ this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
+ }
+
+ // deep copy the serializer instances in kryoRegistrations
+ for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) {
+
+ KryoRegistration kryoRegistration = entry.getValue();
+
+ if (kryoRegistration.getSerializerDefinitionType() == KryoRegistration.SerializerDefinitionType.INSTANCE) {
+
+ ExecutionConfig.SerializableSerializer<? extends Serializer<?>> serializerInstance =
+ kryoRegistration.getSerializableSerializerInstance();
+
+ if (serializerInstance != null) {
+ kryoRegistration = new KryoRegistration(
+ kryoRegistration.getRegisteredClass(),
+ deepCopySerializer(serializerInstance));
+ }
+ }
+
+ this.kryoRegistrations.put(entry.getKey(), kryoRegistration);
}
}
@@ -568,6 +591,17 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
}
}
+ private ExecutionConfig.SerializableSerializer<? extends Serializer<?>> deepCopySerializer(
+ ExecutionConfig.SerializableSerializer<? extends Serializer<?>> original) {
+ try {
+ return InstantiationUtil.clone(original, Thread.currentThread().getContextClassLoader());
+ } catch (IOException | ClassNotFoundException ex) {
+ throw new CloneFailedException(
+ "Could not clone serializer instance of class " + original.getClass(),
+ ex);
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// For testing
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7d0bfd51/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
index ca81fd4..3680773 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerConcurrencyTest.java
@@ -24,15 +24,21 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.core.testutils.CheckedThread;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.io.Serializable;
import static org.junit.Assert.fail;
/**
* This tests that the {@link KryoSerializer} properly fails when accessed by two threads
- * concurrently.
+ * concurrently and that Kryo serializers are properly duplicated to use them in different threads.
*
* <p><b>Important:</b> This test only works if assertions are activated (-ea) on the JVM
* when running tests.
@@ -40,6 +46,50 @@ import static org.junit.Assert.fail;
public class KryoSerializerConcurrencyTest {
@Test
+ public void testDuplicateSerializerWithDefaultSerializerClass() {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.addDefaultKryoSerializer(WrappedString.class, TestSerializer.class);
+ runDuplicateSerializerTest(executionConfig);
+ }
+
+ @Test
+ public void testDuplicateSerializerWithDefaultSerializerInstance() {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.addDefaultKryoSerializer(WrappedString.class, new TestSerializer());
+ runDuplicateSerializerTest(executionConfig);
+ }
+
+ @Test
+ public void testDuplicateSerializerWithRegisteredSerializerClass() {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.registerTypeWithKryoSerializer(WrappedString.class, TestSerializer.class);
+ runDuplicateSerializerTest(executionConfig);
+ }
+
+ @Test
+ public void testDuplicateSerializerWithRegisteredSerializerInstance() {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.registerTypeWithKryoSerializer(WrappedString.class, new TestSerializer());
+ runDuplicateSerializerTest(executionConfig);
+ }
+
+ private void runDuplicateSerializerTest(ExecutionConfig executionConfig) {
+ final KryoSerializer<WrappedString> original = new KryoSerializer<>(WrappedString.class, executionConfig);
+ final KryoSerializer<WrappedString> duplicate = original.duplicate();
+
+ WrappedString testString = new WrappedString("test");
+
+ String copyWithOriginal = original.copy(testString).content;
+ String copyWithDuplicate = duplicate.copy(testString).content;
+
+ Assert.assertTrue(copyWithOriginal.startsWith(testString.content));
+ Assert.assertTrue(copyWithDuplicate.startsWith(testString.content));
+
+ // check that both serializer instances have appended a different identity hash
+ Assert.assertNotEquals(copyWithOriginal, copyWithDuplicate);
+ }
+
+ @Test
public void testConcurrentUseOfSerializer() throws Exception {
final KryoSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
@@ -92,4 +142,41 @@ public class KryoSerializerConcurrencyTest {
blocker.blockNonInterruptible();
}
}
+
+ /**
+ * A test class that wraps a string.
+ */
+ public static class WrappedString {
+
+ private final String content;
+
+ WrappedString(String content) {
+ this.content = content;
+ }
+
+ @Override
+ public String toString() {
+ return "WrappedString{" +
+ "content='" + content + '\'' +
+ '}';
+ }
+ }
+
+ /**
+ * A test serializer for {@link WrappedString} that appends its identity hash.
+ */
+ public static class TestSerializer extends Serializer<WrappedString> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void write(Kryo kryo, Output output, WrappedString object) {
+ output.writeString(object.content);
+ }
+
+ @Override
+ public WrappedString read(Kryo kryo, Input input, Class<WrappedString> type) {
+ return new WrappedString(input.readString() + " " + System.identityHashCode(this));
+ }
+ }
}