You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/12 16:47:24 UTC

[GitHub] asfgit closed pull request #7061: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase

asfgit closed pull request #7061: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase
URL: https://github.com/apache/flink/pull/7061
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index a4d3839f66f..f1dd8fcbde9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -18,29 +18,17 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Objects;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -52,6 +40,18 @@
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 @Internal
@@ -181,11 +181,13 @@ public boolean isImmutableType() {
 			}
 		}
 
-		if (stateful) {
-			return new PojoSerializer<T>(clazz, duplicateFieldSerializers, fields, executionConfig);
-		} else {
-			return this;
+		if (!stateful) {
+			// as a small memory optimization, we can share the same object between instances
+			duplicateFieldSerializers = fieldSerializers;
 		}
+
+		// we must create a new instance, otherwise the subclassSerializerCache can create concurrency problems
+		return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, executionConfig);
 	}
 
 	
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index a4908f95e90..77099ef1cc9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -18,32 +18,39 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-
 import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
 
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CyclicBarrier;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Abstract test base for serializers.
  *
@@ -436,6 +443,34 @@ public void testNullability() {
 		}
 	}
 
+	@Test
+	public void testDuplicate() throws Exception {
+		final int numThreads = 10;
+		final TypeSerializer<T> serializer = getSerializer();
+		final CyclicBarrier startLatch = new CyclicBarrier(numThreads);
+		final List<SerializerRunner<T>> concurrentRunners = new ArrayList<>(numThreads);
+		Assert.assertEquals(serializer, serializer.duplicate());
+
+		T[] testData = getData();
+		final int testDataIterations = Math.max(1, 250 / testData.length);
+
+		for (int i = 0; i < numThreads; ++i) {
+			SerializerRunner<T> runner = new SerializerRunner<>(
+				startLatch,
+				serializer.duplicate(),
+				testData,
+				testDataIterations);
+
+			runner.start();
+			concurrentRunners.add(runner);
+		}
+
+		for (SerializerRunner<T> concurrentRunner : concurrentRunners) {
+			concurrentRunner.join();
+			concurrentRunner.checkResult();
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	protected void deepEquals(String message, T should, T is) {
@@ -526,6 +561,56 @@ public void write(DataInputView source, int numBytes) throws IOException {
 		}
 	}
 
+	/**
+	 * Runner to test serializer duplication via concurrency.
+	 * @param <T> type of the test elements.
+	 */
+	static class SerializerRunner<T> extends Thread {
+		final CyclicBarrier allReadyBarrier;
+		final TypeSerializer<T> serializer;
+		final T[] testData;
+		final int iterations;
+		Exception failure;
+
+		SerializerRunner(CyclicBarrier allReadyBarrier, TypeSerializer<T> serializer, T[] testData, int iterations) {
+			this.allReadyBarrier = allReadyBarrier;
+			this.serializer = serializer;
+			this.testData = testData;
+			this.iterations = iterations;
+			this.failure = null;
+		}
+
+		@Override
+		public void run() {
+			DataInputDeserializer dataInputDeserializer = new DataInputDeserializer();
+			DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
+			try {
+				allReadyBarrier.await();
+				for (int repeat = 0; repeat < iterations; ++repeat) {
+					for (T testItem : testData) {
+						serializer.serialize(testItem, dataOutputSerializer);
+						dataInputDeserializer.setBuffer(
+							dataOutputSerializer.getSharedBuffer(),
+							0,
+							dataOutputSerializer.length());
+						T serdeTestItem = serializer.deserialize(dataInputDeserializer);
+						T copySerdeTestItem = serializer.copy(serdeTestItem);
+						dataOutputSerializer.clear();
+						Preconditions.checkState(Objects.equals(testItem, copySerdeTestItem),
+							"Serialization/Deserialization cycle resulted in an object that are not equal to the original.");
+					}
+				}
+			} catch (Exception ex) {
+				failure = ex;
+			}
+		}
+
+		void checkResult() throws Exception {
+			if (failure != null) {
+				throw failure;
+			}
+		}
+	}
 
 	private static final class TestInputView extends DataInputStream implements DataInputView {
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services