You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/06/01 15:52:48 UTC

flink git commit: [FLINK-6775] [state] Duplicate StateDescriptor's serializer

Repository: flink
Updated Branches:
  refs/heads/master b9bdef434 -> 88ffad272


[FLINK-6775] [state] Duplicate StateDescriptor's serializer

Duplicate the TypeSerializer before returning it from the StateDescriptor. That way
we ensure that StateDescriptors can be shared by multiple threads.

Add test case for AggregatingStateDescriptor

Fix OperatorStateBackendTest#testCorrectClassLoaderUsedOnSnapshot

This closes #4025.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88ffad27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88ffad27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88ffad27

Branch: refs/heads/master
Commit: 88ffad272eea5865cc43bf44a8980754d8711178
Parents: b9bdef4
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 31 13:59:55 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 1 17:50:45 2017 +0200

----------------------------------------------------------------------
 .../api/common/state/ListStateDescriptor.java   |   2 +-
 .../flink/api/common/state/StateDescriptor.java |   2 +-
 .../state/AggregatingStateDescriptorTest.java   |  64 +++++++++
 .../common/state/ListStateDescriptorTest.java   |  38 ++++++
 .../common/state/MapStateDescriptorTest.java    |  48 +++++++
 .../state/ReducingStateDescriptorTest.java      |  35 ++++-
 .../common/state/ValueStateDescriptorTest.java  |  34 ++++-
 .../runtime/state/OperatorStateBackendTest.java | 134 ++++++++++++++++---
 8 files changed, 333 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88ffad27/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index ea28ad2..a50e25d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -95,7 +95,7 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 			throw new IllegalStateException();
 		}
 
-		return ((ListSerializer<T>)serializer).getElementSerializer();
+		return ((ListSerializer<T>) rawSerializer).getElementSerializer();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/88ffad27/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 073d748..199f8d5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -174,7 +174,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 */
 	public TypeSerializer<T> getSerializer() {
 		if (serializer != null) {
-			return serializer;
+			return serializer.duplicate();
 		} else {
 			throw new IllegalStateException("Serializer not yet initialized.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/88ffad27/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
new file mode 100644
index 0000000..1b27ebd
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertNotSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AggregatingStateDescriptorTest extends TestLogger {
+
+	/**
+	 * FLINK-6775
+	 *
+	 * Tests that the returned serializer is duplicated. This allows to
+	 * share the state descriptor.
+	 */
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testSerializerDuplication() {
+		TypeSerializer<Long> serializer = mock(TypeSerializer.class);
+		when(serializer.duplicate()).thenAnswer(new Answer<TypeSerializer<Long>>() {
+			@Override
+			public TypeSerializer<Long> answer(InvocationOnMock invocation) throws Throwable {
+				return mock(TypeSerializer.class);
+			}
+		});
+
+		AggregateFunction<Long, Long, Long> aggregatingFunction = mock(AggregateFunction.class);
+
+		AggregatingStateDescriptor<Long, Long, Long> descr = new AggregatingStateDescriptor<>(
+			"foobar",
+			aggregatingFunction,
+			serializer);
+
+		TypeSerializer<Long> serializerA = descr.getSerializer();
+		TypeSerializer<Long> serializerB = descr.getSerializer();
+
+		// check that the retrieved serializers are not the same
+		assertNotSame(serializerA, serializerB);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88ffad27/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index b9d9a8c..0b230ad 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -28,11 +28,18 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ListStateDescriptorTest {
 	
@@ -101,4 +108,35 @@ public class ListStateDescriptorTest {
 		assertNotNull(copy.getElementSerializer());
 		assertEquals(StringSerializer.INSTANCE, copy.getElementSerializer());
 	}
+
+	/**
+	 * FLINK-6775
+	 *
+	 * Tests that the returned serializer is duplicated. This allows to
+	 * share the state descriptor.
+	 */
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testSerializerDuplication() {
+		TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class);
+		when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
+			@Override
+			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
+				return mock(TypeSerializer.class);
+			}
+		});
+
+		ListStateDescriptor<String> descr = new ListStateDescriptor<>("foobar", statefulSerializer);
+
+		TypeSerializer<String> serializerA = descr.getElementSerializer();
+		TypeSerializer<String> serializerB = descr.getElementSerializer();
+
+		// check that the retrieved serializers are not the same
+		assertNotSame(serializerA, serializerB);
+
+		TypeSerializer<List<String>> listSerializerA = descr.getSerializer();
+		TypeSerializer<List<String>> listSerializerB = descr.getSerializer();
+
+		assertNotSame(listSerializerA, listSerializerB);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88ffad27/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
index 9d1b105..d710911 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
@@ -29,11 +29,18 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class MapStateDescriptorTest {
 	
@@ -112,4 +119,45 @@ public class MapStateDescriptorTest {
 		assertNotNull(copy.getValueSerializer());
 		assertEquals(LongSerializer.INSTANCE, copy.getValueSerializer());
 	}
+
+	/**
+	 * FLINK-6775
+	 *
+	 * Tests that the returned serializer is duplicated. This allows to
+	 * share the state descriptor.
+	 */
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testSerializerDuplication() {
+		TypeSerializer<String> keySerializer = mock(TypeSerializer.class);
+		TypeSerializer<Long> valueSerializer = mock(TypeSerializer.class);
+		when(keySerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
+			@Override
+			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
+				return mock(TypeSerializer.class);
+			}
+		});
+		when(valueSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<Long>>() {
+			@Override
+			public TypeSerializer<Long> answer(InvocationOnMock invocation) throws Throwable {
+				return mock(TypeSerializer.class);
+			}
+		});
+
+		MapStateDescriptor<String, Long> descr = new MapStateDescriptor<>("foobar", keySerializer, valueSerializer);
+
+		TypeSerializer<String> keySerializerA = descr.getKeySerializer();
+		TypeSerializer<String> keySerializerB = descr.getKeySerializer();
+		TypeSerializer<Long> valueSerializerA = descr.getValueSerializer();
+		TypeSerializer<Long> valueSerializerB = descr.getValueSerializer();
+
+		// check that we did not retrieve the same serializers
+		assertNotSame(keySerializerA, keySerializerB);
+		assertNotSame(valueSerializerA, valueSerializerB);
+
+		TypeSerializer<Map<String, Long>> serializerA = descr.getSerializer();
+		TypeSerializer<Map<String, Long>> serializerB = descr.getSerializer();
+
+		assertNotSame(serializerA, serializerB);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88ffad27/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
index 0bac930..aec7140 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -27,16 +27,21 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-public class ReducingStateDescriptorTest {
+public class ReducingStateDescriptorTest extends TestLogger {
 	
 	@Test
 	public void testValueStateDescriptorEagerSerializer() throws Exception {
@@ -101,5 +106,33 @@ public class ReducingStateDescriptorTest {
 		assertNotNull(copy.getSerializer());
 		assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
 	}
+
+	/**
+	 * FLINK-6775
+	 *
+	 * Tests that the returned serializer is duplicated. This allows to
+	 * share the state descriptor.
+	 */
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testSerializerDuplication() {
+		TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class);
+		when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
+			@Override
+			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
+				return mock(TypeSerializer.class);
+			}
+		});
+
+		ReduceFunction<String> reducer = mock(ReduceFunction.class);
+
+		ReducingStateDescriptor<String> descr = new ReducingStateDescriptor<>("foobar", reducer, statefulSerializer);
+
+		TypeSerializer<String> serializerA = descr.getSerializer();
+		TypeSerializer<String> serializerB = descr.getSerializer();
+
+		// check that the retrieved serializers are not the same
+		assertNotSame(serializerA, serializerB);
+	}
 	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88ffad27/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index 674f7e3..e434e01 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -27,16 +27,22 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.File;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
-public class ValueStateDescriptorTest {
+public class ValueStateDescriptorTest extends TestLogger {
 	
 	@Test
 	public void testValueStateDescriptorEagerSerializer() throws Exception {
@@ -130,4 +136,30 @@ public class ValueStateDescriptorTest {
 		assertNotNull(copy.getSerializer());
 		assertEquals(serializer, copy.getSerializer());
 	}
+
+	/**
+	 * FLINK-6775
+	 *
+	 * Tests that the returned serializer is duplicated. This allows to
+	 * share the state descriptor.
+	 */
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testSerializerDuplication() {
+		TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class);
+		when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
+			@Override
+			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
+				return mock(TypeSerializer.class);
+			}
+		});
+
+		ValueStateDescriptor<String> descr = new ValueStateDescriptor<>("foobar", statefulSerializer);
+
+		TypeSerializer<String> serializerA = descr.getSerializer();
+		TypeSerializer<String> serializerB = descr.getSerializer();
+
+		// check that the retrieved serializers are not the same
+		assertNotSame(serializerA, serializerB);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88ffad27/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index d44f6c9..0fff501 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -21,11 +21,15 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
@@ -33,12 +37,10 @@ import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableL
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.Preconditions;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -55,6 +57,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -65,11 +68,10 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(TypeSerializerSerializationUtil.class)
+@PrepareForTest({TypeSerializerSerializationUtil.class, IntSerializer.class})
 public class OperatorStateBackendTest {
 
 	private final ClassLoader classLoader = getClass().getClassLoader();
@@ -222,21 +224,11 @@ public class OperatorStateBackendTest {
 		final Environment env = createMockEnvironment();
 		OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env, "test-op-name");
 
-		// mock serializer which tests that on copy, the correct classloader is used as the context classloader
-		TypeSerializer<Integer> mockSerializer = mock(TypeSerializer.class);
-		when(mockSerializer.copy(Matchers.any(Integer.class))).thenAnswer(new Answer<Object>() {
-			@Override
-			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-				Assert.assertEquals(env.getUserClassLoader(), Thread.currentThread().getContextClassLoader());
-				return null;
-			}
-		});
-		// return actual serializers / config snapshots so that the snapshot proceeds properly
-		when(mockSerializer.duplicate()).thenReturn(IntSerializer.INSTANCE);
-		when(mockSerializer.snapshotConfiguration()).thenReturn(IntSerializer.INSTANCE.snapshotConfiguration());
+		AtomicInteger copyCounter = new AtomicInteger(0);
+		TypeSerializer<Integer> serializer = new VerifyingIntSerializer(env.getUserClassLoader(), copyCounter);
 
 		// write some state
-		ListStateDescriptor<Integer> stateDescriptor = new ListStateDescriptor<>("test", mockSerializer);
+		ListStateDescriptor<Integer> stateDescriptor = new ListStateDescriptor<>("test", serializer);
 		ListState<Integer> listState = operatorStateBackend.getListState(stateDescriptor);
 
 		listState.add(42);
@@ -246,8 +238,110 @@ public class OperatorStateBackendTest {
 			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint());
 		FutureUtil.runIfNotDoneAndGet(runnableFuture);
 
-		// make sure that the method of interest is called
-		verify(mockSerializer).copy(Matchers.any(Integer.class));
+		// make sure that the copy method has been called
+		assertTrue(copyCounter.get() > 0);
+	}
+
+	/**
+	 * Int serializer which verifies that the given classloader is set for the copy operation
+	 */
+	private static final class VerifyingIntSerializer extends TypeSerializer<Integer> {
+
+		private static final long serialVersionUID = -5344563614550163898L;
+
+		private transient ClassLoader classLoader;
+		private transient AtomicInteger atomicInteger;
+
+		private VerifyingIntSerializer(ClassLoader classLoader, AtomicInteger atomicInteger) {
+			this.classLoader = Preconditions.checkNotNull(classLoader);
+			this.atomicInteger = Preconditions.checkNotNull(atomicInteger);
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			// otherwise the copy method won't be called for the deepCopy operation
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<Integer> duplicate() {
+			return this;
+		}
+
+		@Override
+		public Integer createInstance() {
+			return 0;
+		}
+
+		@Override
+		public Integer copy(Integer from) {
+			assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
+			atomicInteger.incrementAndGet();
+			return IntSerializer.INSTANCE.copy(from);
+		}
+
+		@Override
+		public Integer copy(Integer from, Integer reuse) {
+			assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
+			atomicInteger.incrementAndGet();
+			return IntSerializer.INSTANCE.copy(from, reuse);
+		}
+
+		@Override
+		public int getLength() {
+			return IntSerializer.INSTANCE.getLength();
+		}
+
+		@Override
+		public void serialize(Integer record, DataOutputView target) throws IOException {
+			IntSerializer.INSTANCE.serialize(record, target);
+		}
+
+		@Override
+		public Integer deserialize(DataInputView source) throws IOException {
+			return IntSerializer.INSTANCE.deserialize(source);
+		}
+
+		@Override
+		public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+			return IntSerializer.INSTANCE.deserialize(reuse, source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			assertEquals(classLoader, Thread.currentThread().getContextClassLoader());
+			atomicInteger.incrementAndGet();
+			IntSerializer.INSTANCE.copy(source, target);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof VerifyingIntSerializer) {
+				return ((VerifyingIntSerializer)obj).canEqual(this);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof VerifyingIntSerializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return IntSerializer.INSTANCE.snapshotConfiguration();
+		}
+
+		@Override
+		public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			return IntSerializer.INSTANCE.ensureCompatibility(configSnapshot);
+		}
 	}
 
 	@Test