You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/24 05:10:15 UTC

[1/2] flink git commit: [FLINK-6018] Add tests for KryoSerializer restore with registered types

Repository: flink
Updated Branches:
  refs/heads/master 68289b1a5 -> 09164cf23


http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index faa9314..22bb715 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.state;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import com.google.common.base.Joiner;
 import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -34,11 +36,15 @@ import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -61,9 +67,11 @@ import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -90,6 +98,8 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import org.junit.rules.ExpectedException;
+
 
 /**
  * Generic tests for the partitioned state part of {@link AbstractStateBackend}.
@@ -97,6 +107,9 @@ import static org.mockito.Mockito.verify;
 @SuppressWarnings("serial")
 public abstract class StateBackendTestBase<B extends AbstractStateBackend> extends TestLogger {
 
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
 	protected abstract B getStateBackend() throws Exception;
 
 	protected CheckpointStreamFactory createStreamFactory() throws Exception {
@@ -171,21 +184,478 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 	@Test
 	@SuppressWarnings("unchecked")
+	public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		// cast because our test serializer is not typed to TestPojo
+		env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// we will be expecting ExpectedKryoTestException to be thrown,
+		// because the ExceptionThrowingTestSerializer should be used
+		int numExceptions = 0;
+
+		backend.setCurrentKey(1);
+
+		try {
+			// backends that eagerly serializes (such as RocksDB) will fail here
+			state.update(new TestPojo("u1", 1));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		try {
+			// backends that lazily serializes (such as memory state backend) will fail here
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		// cast because our test serializer is not typed to TestPojo
+		env.getExecutionConfig()
+				.addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		pojoType.createSerializer(env.getExecutionConfig());
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+		assertTrue(state instanceof InternalValueState);
+		((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+		// we will be expecting ExpectedKryoTestException to be thrown,
+		// because the ExceptionThrowingTestSerializer should be used
+		int numExceptions = 0;
+
+		backend.setCurrentKey(1);
+
+		try {
+			// backends that eagerly serializes (such as RocksDB) will fail here
+			state.update(new TestPojo("u1", 1));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		try {
+			// backends that lazily serializes (such as memory state backend) will fail here
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+	}
+
+	@Test
+	public void testBackendUsesRegisteredKryoSerializer() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		env.getExecutionConfig()
+				.registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// we will be expecting ExpectedKryoTestException to be thrown,
+		// because the ExceptionThrowingTestSerializer should be used
+		int numExceptions = 0;
+
+		backend.setCurrentKey(1);
+
+		try {
+			// backends that eagerly serializes (such as RocksDB) will fail here
+			state.update(new TestPojo("u1", 1));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		try {
+			// backends that lazily serializes (such as memory state backend) will fail here
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+		assertTrue(state instanceof InternalValueState);
+		((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+		// we will be expecting ExpectedKryoTestException to be thrown,
+		// because the ExceptionThrowingTestSerializer should be used
+		int numExceptions = 0;
+
+		backend.setCurrentKey(1);
+
+		try {
+			// backends that eagerly serializes (such as RocksDB) will fail here
+			state.update(new TestPojo("u1", 1));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		try {
+			// backends that lazily serializes (such as memory state backend) will fail here
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+	}
+
+
+	/**
+	 * Verify state restore resilience when:
+	 *  - snapshot was taken without any Kryo registrations, specific serializers or default serializers for the state type
+	 *  - restored with the state type registered (no specific serializer)
+	 *
+	 * This test should not fail, because de- / serialization of the state should noth be performed with Kryo's default
+	 * {@link com.esotericsoftware.kryo.serializers.FieldSerializer}.
+	 */
+	@Test
+	public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// ============== create snapshot - no Kryo registration or specific / default serializers ==============
+
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.update(new TestPojo("u1", 1));
+
+		backend.setCurrentKey(2);
+		state.update(new TestPojo("u2", 2));
+
+		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ====================================== restore snapshot  ======================================
+
+		env.getExecutionConfig().registerKryoType(TestPojo.class);
+		
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+		snapshot.discardState();
+
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		backend.setCurrentKey(1);
+		assertEquals(state.value(), new TestPojo("u1", 1));
+
+		backend.setCurrentKey(2);
+		assertEquals(state.value(), new TestPojo("u2", 2));
+
+		backend.dispose();
+	}
+
+	/**
+	 * Verify state restore resilience when:
+	 *  - snapshot was taken without any Kryo registrations, specific serializers or default serializers for the state type
+	 *  - restored with a default serializer for the state type
+	 *
+	 * <p> The default serializer used on restore is {@link CustomKryoTestSerializer}, which deliberately
+	 * fails only on deserialization. We use the deliberate deserialization failure to acknowledge test success.
+	 *
+	 * @throws Exception expects {@link ExpectedKryoTestException} to be thrown.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// ============== create snapshot - no Kryo registration or specific / default serializers ==============
+
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.update(new TestPojo("u1", 1));
+
+		backend.setCurrentKey(2);
+		state.update(new TestPojo("u2", 2));
+
+		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========== restore snapshot - should use default serializer (ONLY SERIALIZATION) ==========
+
+		// cast because our test serializer is not typed to TestPojo
+		env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class);
+
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+		snapshot.discardState();
+
+		// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
+		// initializeSerializerUnlessSet would not pick up our new config
+		kvId = new ValueStateDescriptor<>("id", pojoType);
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+
+		// update to test state backends that eagerly serialize, such as RocksDB
+		state.update(new TestPojo("u1", 11));
+
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========= restore snapshot - should use default serializer (FAIL ON DESERIALIZATION) =========
+
+		// cast because our test serializer is not typed to TestPojo
+		env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class);
+
+		// on the second restore, since the custom serializer will be used for
+		// deserialization, we expect the deliberate failure to be thrown
+		expectedException.expect(ExpectedKryoTestException.class);
+
+		// state backends that eagerly deserializes (such as the memory state backend) will fail here
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
+
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+		// state backends that lazily deserializes (such as RocksDB) will fail here
+		state.value();
+	}
+
+	/**
+	 * Verify state restore resilience when:
+	 *  - snapshot was taken without any Kryo registrations, specific serializers or default serializers for the state type
+	 *  - restored with a specific serializer for the state type
+	 *
+	 * <p> The specific serializer used on restore is {@link CustomKryoTestSerializer}, which deliberately
+	 * fails only on deserialization. We use the deliberate deserialization failure to acknowledge test success.
+	 *
+	 * @throws Exception expects {@link ExpectedKryoTestException} to be thrown.
+	 */
+	@Test
+	public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// ============== create snapshot - no Kryo registration or specific / default serializers ==============
+
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.update(new TestPojo("u1", 1));
+
+		backend.setCurrentKey(2);
+		state.update(new TestPojo("u2", 2));
+
+		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========== restore snapshot - should use specific serializer (ONLY SERIALIZATION) ==========
+
+		env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
+
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+		snapshot.discardState();
+
+		// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
+		// initializeSerializerUnlessSet would not pick up our new config
+		kvId = new ValueStateDescriptor<>("id", pojoType);
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+
+		// update to test state backends that eagerly serialize, such as RocksDB
+		state.update(new TestPojo("u1", 11));
+
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========= restore snapshot - should use specific serializer (FAIL ON DESERIALIZATION) =========
+
+		env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
+
+		// on the second restore, since the custom serializer will be used for
+		// deserialization, we expect the deliberate failure to be thrown
+		expectedException.expect(ExpectedKryoTestException.class);
+
+		// state backends that eagerly deserializes (such as the memory state backend) will fail here
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
+
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+		// state backends that lazily deserializes (such as RocksDB) will fail here
+		state.value();
+	}
+
+
+	@Test
+	@SuppressWarnings("unchecked")
 	public void testValueState() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
 		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-		TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
 		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
+		// this is only available after the backend initialized the serializer
+		TypeSerializer<String> valueSerializer = kvId.getSerializer();
+		
 		// some modifications to the state
 		backend.setCurrentKey(1);
 		assertNull(state.value());
@@ -276,16 +746,17 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		final ValueStateDescriptor<String> kvId =
 			new ValueStateDescriptor<>("id", String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		final TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
 		final TypeSerializer<Integer> namespaceSerializer =
 			IntSerializer.INSTANCE;
-		final TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
 		final ValueState<String> state = backend
 			.getPartitionedState(namespace, IntSerializer.INSTANCE, kvId);
 
+		// this is only available after the backend initialized the serializer
+		final TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
 		@SuppressWarnings("unchecked")
 		final InternalKvState<Integer> kvState = (InternalKvState<Integer>) state;
 
@@ -390,9 +861,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		ValueStateDescriptor<String> desc1 = new ValueStateDescriptor<>("a-string", StringSerializer.INSTANCE);
 		ValueStateDescriptor<Integer> desc2 = new ValueStateDescriptor<>("an-integer", IntSerializer.INSTANCE);
 
-		desc1.initializeSerializerUnlessSet(new ExecutionConfig());
-		desc2.initializeSerializerUnlessSet(new ExecutionConfig());
-
 		ValueState<String> state1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc1);
 		ValueState<Integer> state2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc2);
 
@@ -459,7 +927,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<Long> kvId = new ValueStateDescriptor<>("id", LongSerializer.INSTANCE, 42L);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ValueState<Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -499,463 +966,443 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 	@Test
 	@SuppressWarnings("unchecked,rawtypes")
-	public void testListState() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testListState() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
-			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+		ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
 
-			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
-			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-			TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
+		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 
-			ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
-			Joiner joiner = Joiner.on(",");
-			// some modifications to the state
-			backend.setCurrentKey(1);
-			assertEquals(null, state.get());
-			assertEquals(null, getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add("1");
-			backend.setCurrentKey(2);
-			assertEquals(null, state.get());
-			assertEquals(null, getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add("2");
-			backend.setCurrentKey(1);
-			assertEquals("1", joiner.join(state.get()));
-			assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		// this is only available after the backend initialized the serializer
+		TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
 
-			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		Joiner joiner = Joiner.on(",");
+		// some modifications to the state
+		backend.setCurrentKey(1);
+		assertEquals(null, state.get());
+		assertEquals(null, getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add("1");
+		backend.setCurrentKey(2);
+		assertEquals(null, state.get());
+		assertEquals(null, getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add("2");
+		backend.setCurrentKey(1);
+		assertEquals("1", joiner.join(state.get()));
+		assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
-			// make some more modifications
-			backend.setCurrentKey(1);
-			state.add("u1");
-			backend.setCurrentKey(2);
-			state.add("u2");
-			backend.setCurrentKey(3);
-			state.add("u3");
+		// draw a snapshot
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.add("u1");
+		backend.setCurrentKey(2);
+		state.add("u2");
+		backend.setCurrentKey(3);
+		state.add("u3");
 
-			// validate the original state
-			backend.setCurrentKey(1);
-			assertEquals("1,u1", joiner.join(state.get()));
-			assertEquals("1,u1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-			backend.setCurrentKey(2);
-			assertEquals("2,u2", joiner.join(state.get()));
-			assertEquals("2,u2", joiner.join(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-			backend.setCurrentKey(3);
-			assertEquals("u3", joiner.join(state.get()));
-			assertEquals("u3", joiner.join(getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		// draw another snapshot
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			backend.dispose();
-			// restore the first snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
-			snapshot1.discardState();
+		// validate the original state
+		backend.setCurrentKey(1);
+		assertEquals("1,u1", joiner.join(state.get()));
+		assertEquals("1,u1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		backend.setCurrentKey(2);
+		assertEquals("2,u2", joiner.join(state.get()));
+		assertEquals("2,u2", joiner.join(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		backend.setCurrentKey(3);
+		assertEquals("u3", joiner.join(state.get()));
+		assertEquals("u3", joiner.join(getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
-			ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		backend.dispose();
+		// restore the first snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+		snapshot1.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("1", joiner.join(restored1.get()));
-			assertEquals("1", joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-			backend.setCurrentKey(2);
-			assertEquals("2", joiner.join(restored1.get()));
-			assertEquals("2", joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
-			backend.dispose();
-			// restore the second snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
-			snapshot2.discardState();
+		backend.setCurrentKey(1);
+		assertEquals("1", joiner.join(restored1.get()));
+		assertEquals("1", joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		backend.setCurrentKey(2);
+		assertEquals("2", joiner.join(restored1.get()));
+		assertEquals("2", joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
-			ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		backend.dispose();
+		// restore the second snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+		snapshot2.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("1,u1", joiner.join(restored2.get()));
-			assertEquals("1,u1", joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-			backend.setCurrentKey(2);
-			assertEquals("2,u2", joiner.join(restored2.get()));
-			assertEquals("2,u2", joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-			backend.setCurrentKey(3);
-			assertEquals("u3", joiner.join(restored2.get()));
-			assertEquals("u3", joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
-			backend.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		backend.setCurrentKey(1);
+		assertEquals("1,u1", joiner.join(restored2.get()));
+		assertEquals("1,u1", joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		backend.setCurrentKey(2);
+		assertEquals("2,u2", joiner.join(restored2.get()));
+		assertEquals("2,u2", joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		backend.setCurrentKey(3);
+		assertEquals("u3", joiner.join(restored2.get()));
+		assertEquals("u3", joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
+		backend.dispose();
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testReducingState() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testReducingState() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-			ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
-			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+		ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
 
-			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
-			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-			TypeSerializer<String> valueSerializer = kvId.getSerializer();
+		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 
-			ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
-			// some modifications to the state
-			backend.setCurrentKey(1);
-			assertEquals(null, state.get());
-			assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add("1");
-			backend.setCurrentKey(2);
-			assertEquals(null, state.get());
-			assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add("2");
-			backend.setCurrentKey(1);
-			assertEquals("1", state.get());
-			assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		// this is only available after the backend initialized the serializer
+		TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
-			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// some modifications to the state
+		backend.setCurrentKey(1);
+		assertEquals(null, state.get());
+		assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add("1");
+		backend.setCurrentKey(2);
+		assertEquals(null, state.get());
+		assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add("2");
+		backend.setCurrentKey(1);
+		assertEquals("1", state.get());
+		assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			// make some more modifications
-			backend.setCurrentKey(1);
-			state.add("u1");
-			backend.setCurrentKey(2);
-			state.add("u2");
-			backend.setCurrentKey(3);
-			state.add("u3");
+		// draw a snapshot
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.add("u1");
+		backend.setCurrentKey(2);
+		state.add("u2");
+		backend.setCurrentKey(3);
+		state.add("u3");
 
-			// validate the original state
-			backend.setCurrentKey(1);
-			assertEquals("1,u1", state.get());
-			assertEquals("1,u1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("2,u2", state.get());
-			assertEquals("2,u2", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(3);
-			assertEquals("u3", state.get());
-			assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		// draw another snapshot
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			backend.dispose();
-			// restore the first snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
-			snapshot1.discardState();
+		// validate the original state
+		backend.setCurrentKey(1);
+		assertEquals("1,u1", state.get());
+		assertEquals("1,u1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("2,u2", state.get());
+		assertEquals("2,u2", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(3);
+		assertEquals("u3", state.get());
+		assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		backend.dispose();
+		// restore the first snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+		snapshot1.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("1", restored1.get());
-			assertEquals("1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("2", restored1.get());
-			assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
-			backend.dispose();
-			// restore the second snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
-			snapshot2.discardState();
+		backend.setCurrentKey(1);
+		assertEquals("1", restored1.get());
+		assertEquals("1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("2", restored1.get());
+		assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		backend.dispose();
+		// restore the second snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+		snapshot2.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("1,u1", restored2.get());
-			assertEquals("1,u1", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("2,u2", restored2.get());
-			assertEquals("2,u2", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(3);
-			assertEquals("u3", restored2.get());
-			assertEquals("u3", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
-			backend.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		backend.setCurrentKey(1);
+		assertEquals("1,u1", restored2.get());
+		assertEquals("1,u1", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("2,u2", restored2.get());
+		assertEquals("2,u2", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(3);
+		assertEquals("u3", restored2.get());
+		assertEquals("u3", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+
+		backend.dispose();
 	}
 
 	@Test
 	@SuppressWarnings("unchecked,rawtypes")
-	public void testFoldingState() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testFoldingState() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-			FoldingStateDescriptor<Integer, String> kvId = new FoldingStateDescriptor<>("id",
-					"Fold-Initial:",
-					new AppendingFold(),
-					String.class);
-			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+		FoldingStateDescriptor<Integer, String> kvId = new FoldingStateDescriptor<>("id",
+				"Fold-Initial:",
+				new AppendingFold(),
+				String.class);
 
-			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
-			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-			TypeSerializer<String> valueSerializer = kvId.getSerializer();
+		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 
-			FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
-			// some modifications to the state
-			backend.setCurrentKey(1);
-			assertEquals(null, state.get());
-			assertEquals(null, getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add(1);
-			backend.setCurrentKey(2);
-			assertEquals(null, state.get());
-			assertEquals(null, getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add(2);
-			backend.setCurrentKey(1);
-			assertEquals("Fold-Initial:,1", state.get());
-			assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		// this is only available after the backend initialized the serializer
+		TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
-			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// some modifications to the state
+		backend.setCurrentKey(1);
+		assertEquals(null, state.get());
+		assertEquals(null, getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add(1);
+		backend.setCurrentKey(2);
+		assertEquals(null, state.get());
+		assertEquals(null, getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add(2);
+		backend.setCurrentKey(1);
+		assertEquals("Fold-Initial:,1", state.get());
+		assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			// make some more modifications
-			backend.setCurrentKey(1);
-			state.clear();
-			state.add(101);
-			backend.setCurrentKey(2);
-			state.add(102);
-			backend.setCurrentKey(3);
-			state.add(103);
+		// draw a snapshot
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.clear();
+		state.add(101);
+		backend.setCurrentKey(2);
+		state.add(102);
+		backend.setCurrentKey(3);
+		state.add(103);
 
-			// validate the original state
-			backend.setCurrentKey(1);
-			assertEquals("Fold-Initial:,101", state.get());
-			assertEquals("Fold-Initial:,101", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("Fold-Initial:,2,102", state.get());
-			assertEquals("Fold-Initial:,2,102", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(3);
-			assertEquals("Fold-Initial:,103", state.get());
-			assertEquals("Fold-Initial:,103", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		// draw another snapshot
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			backend.dispose();
-			// restore the first snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
-			snapshot1.discardState();
+		// validate the original state
+		backend.setCurrentKey(1);
+		assertEquals("Fold-Initial:,101", state.get());
+		assertEquals("Fold-Initial:,101", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("Fold-Initial:,2,102", state.get());
+		assertEquals("Fold-Initial:,2,102", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(3);
+		assertEquals("Fold-Initial:,103", state.get());
+		assertEquals("Fold-Initial:,103", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		backend.dispose();
+		// restore the first snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+		snapshot1.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("Fold-Initial:,1", restored1.get());
-			assertEquals("Fold-Initial:,1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("Fold-Initial:,2", restored1.get());
-			assertEquals("Fold-Initial:,2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
-			backend.dispose();
-			// restore the second snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
-			snapshot1.discardState();
+		backend.setCurrentKey(1);
+		assertEquals("Fold-Initial:,1", restored1.get());
+		assertEquals("Fold-Initial:,1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("Fold-Initial:,2", restored1.get());
+		assertEquals("Fold-Initial:,2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			@SuppressWarnings("unchecked")
-			FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		backend.dispose();
+		// restore the second snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+		snapshot1.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("Fold-Initial:,101", restored2.get());
-			assertEquals("Fold-Initial:,101", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("Fold-Initial:,2,102", restored2.get());
-			assertEquals("Fold-Initial:,2,102", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(3);
-			assertEquals("Fold-Initial:,103", restored2.get());
-			assertEquals("Fold-Initial:,103", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		@SuppressWarnings("unchecked")
+		FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
-			backend.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		backend.setCurrentKey(1);
+		assertEquals("Fold-Initial:,101", restored2.get());
+		assertEquals("Fold-Initial:,101", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("Fold-Initial:,2,102", restored2.get());
+		assertEquals("Fold-Initial:,2,102", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(3);
+		assertEquals("Fold-Initial:,103", restored2.get());
+		assertEquals("Fold-Initial:,103", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+
+		backend.dispose();
 	}
 
 	@Test
 	@SuppressWarnings("unchecked,rawtypes")
-	public void testMapState() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testMapState() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-			MapStateDescriptor<Integer, String> kvId = new MapStateDescriptor<>("id", Integer.class, String.class);
-			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+		MapStateDescriptor<Integer, String> kvId = new MapStateDescriptor<>("id", Integer.class, String.class);
 
-			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
-			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-			TypeSerializer<Integer> userKeySerializer = kvId.getKeySerializer();
-			TypeSerializer<String> userValueSerializer = kvId.getValueSerializer();
+		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 
-			MapState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		MapState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
-			// some modifications to the state
-			backend.setCurrentKey(1);
-			assertEquals(null, state.get(1));
-			assertEquals(null, getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			state.put(1, "1");
-			backend.setCurrentKey(2);
-			assertEquals(null, state.get(2));
-			assertEquals(null, getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			state.put(2, "2");
-			backend.setCurrentKey(1);
-			assertTrue(state.contains(1));
-			assertEquals("1", state.get(1));
-			assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
-					getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		// these are only available after the backend initialized the serializer
+		TypeSerializer<Integer> userKeySerializer = kvId.getKeySerializer();
+		TypeSerializer<String> userValueSerializer = kvId.getValueSerializer();
 
-			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// some modifications to the state
+		backend.setCurrentKey(1);
+		assertEquals(null, state.get(1));
+		assertEquals(null, getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		state.put(1, "1");
+		backend.setCurrentKey(2);
+		assertEquals(null, state.get(2));
+		assertEquals(null, getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		state.put(2, "2");
+		backend.setCurrentKey(1);
+		assertTrue(state.contains(1));
+		assertEquals("1", state.get(1));
+		assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
+				getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
-			// make some more modifications
-			backend.setCurrentKey(1);
-			state.put(1, "101");
-			backend.setCurrentKey(2);
-			state.put(102, "102");
-			backend.setCurrentKey(3);
-			state.put(103, "103");
-			state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
+		// draw a snapshot
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.put(1, "101");
+		backend.setCurrentKey(2);
+		state.put(102, "102");
+		backend.setCurrentKey(3);
+		state.put(103, "103");
+		state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
 
-			// validate the original state
-			backend.setCurrentKey(1);
-			assertEquals("101", state.get(1));
-			assertEquals(new HashMap<Integer, String>() {{ put(1, "101"); }},
-					getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("102", state.get(102));
-			assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }},
-					getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			backend.setCurrentKey(3);
-			assertTrue(state.contains(103));
-			assertEquals("103", state.get(103));
-			assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
-					getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-
-			List<Integer> keys = new ArrayList<>();
-			for (Integer key : state.keys()) {
-				keys.add(key);
-			}
-			List<Integer> expectedKeys = new ArrayList<Integer>() {{ add(103); add(1031); add(1032); }};
-			assertEquals(keys.size(), expectedKeys.size());
-			keys.removeAll(expectedKeys);
-			assertTrue(keys.isEmpty());
-
-			List<String> values = new ArrayList<>();
-			for (String value : state.values()) {
-				values.add(value);
-			}
-			List<String> expectedValues = new ArrayList<String>() {{ add("103"); add("1031"); add("1032"); }};
-			assertEquals(values.size(), expectedValues.size());
-			values.removeAll(expectedValues);
-			assertTrue(values.isEmpty());
+		// draw another snapshot
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			// make some more modifications
-			backend.setCurrentKey(1);
-			state.clear();
-			backend.setCurrentKey(2);
-			state.remove(102);
-			backend.setCurrentKey(3);
-			final String updateSuffix = "_updated";
-			Iterator<Map.Entry<Integer, String>> iterator = state.iterator();
-			while (iterator.hasNext()) {
-				Map.Entry<Integer, String> entry = iterator.next();
-				if (entry.getValue().length() != 4) {
-					iterator.remove();
-				} else {
-					entry.setValue(entry.getValue() + updateSuffix);
-				}
-			}
+		// validate the original state
+		backend.setCurrentKey(1);
+		assertEquals("101", state.get(1));
+		assertEquals(new HashMap<Integer, String>() {{ put(1, "101"); }},
+				getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("102", state.get(102));
+		assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }},
+				getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey(3);
+		assertTrue(state.contains(103));
+		assertEquals("103", state.get(103));
+		assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
+				getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+
+		List<Integer> keys = new ArrayList<>();
+		for (Integer key : state.keys()) {
+			keys.add(key);
+		}
+		List<Integer> expectedKeys = new ArrayList<Integer>() {{ add(103); add(1031); add(1032); }};
+		assertEquals(keys.size(), expectedKeys.size());
+		keys.removeAll(expectedKeys);
+		assertTrue(keys.isEmpty());
+
+		List<String> values = new ArrayList<>();
+		for (String value : state.values()) {
+			values.add(value);
+		}
+		List<String> expectedValues = new ArrayList<String>() {{ add("103"); add("1031"); add("1032"); }};
+		assertEquals(values.size(), expectedValues.size());
+		values.removeAll(expectedValues);
+		assertTrue(values.isEmpty());
 
-			// validate the state
-			backend.setCurrentKey(1);
-			backend.setCurrentKey(2);
-			assertFalse(state.contains(102));
-			backend.setCurrentKey(3);
-			for (Map.Entry<Integer, String> entry : state.entries()) {
-				assertEquals(4 + updateSuffix.length(), entry.getValue().length());
-				assertTrue(entry.getValue().endsWith(updateSuffix));
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.clear();
+		backend.setCurrentKey(2);
+		state.remove(102);
+		backend.setCurrentKey(3);
+		final String updateSuffix = "_updated";
+		Iterator<Map.Entry<Integer, String>> iterator = state.iterator();
+		while (iterator.hasNext()) {
+			Map.Entry<Integer, String> entry = iterator.next();
+			if (entry.getValue().length() != 4) {
+				iterator.remove();
+			} else {
+				entry.setValue(entry.getValue() + updateSuffix);
 			}
+		}
 
-			backend.dispose();
-			// restore the first snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
-			snapshot1.discardState();
+		// validate the state
+		backend.setCurrentKey(1);
+		backend.setCurrentKey(2);
+		assertFalse(state.contains(102));
+		backend.setCurrentKey(3);
+		for (Map.Entry<Integer, String> entry : state.entries()) {
+			assertEquals(4 + updateSuffix.length(), entry.getValue().length());
+			assertTrue(entry.getValue().endsWith(updateSuffix));
+		}
 
-			MapState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		backend.dispose();
+		// restore the first snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+		snapshot1.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("1", restored1.get(1));
-			assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
-					getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("2", restored1.get(2));
-			assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }},
-					getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		MapState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
-			backend.dispose();
-			// restore the second snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
-			snapshot2.discardState();
+		backend.setCurrentKey(1);
+		assertEquals("1", restored1.get(1));
+		assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
+				getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("2", restored1.get(2));
+		assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }},
+				getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
-			@SuppressWarnings("unchecked")
-			MapState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		backend.dispose();
+		// restore the second snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+		snapshot2.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("101", restored2.get(1));
-			assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }},
-					getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("102", restored2.get(102));
-			assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }},
-					getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			backend.setCurrentKey(3);
-			assertEquals("103", restored2.get(103));
-			assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
-					getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		@SuppressWarnings("unchecked")
+		MapState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
-			backend.dispose();
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		backend.setCurrentKey(1);
+		assertEquals("101", restored2.get(1));
+		assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }},
+				getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("102", restored2.get(102));
+		assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }},
+				getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey(3);
+		assertEquals("103", restored2.get(103));
+		assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
+				getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
+		backend.dispose();
 	}
 
 	/**
@@ -966,7 +1413,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -991,7 +1437,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, "Hello");
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1015,7 +1460,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ReducingState<String> state = backend.getPartitionedState(
 				VoidNamespace.INSTANCE,
@@ -1043,8 +1487,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		FoldingStateDescriptor<Integer, String> kvId =
 				new FoldingStateDescriptor<>("id", "Fold-Initial:", new AppendingFold(), String.class);
 
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
-
 		FoldingState<Integer, String> state = backend.getPartitionedState(
 				VoidNamespace.INSTANCE,
 				VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1071,7 +1513,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ListState<String> state = backend.getPartitionedState(
 				VoidNamespace.INSTANCE,
@@ -1098,7 +1539,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		MapStateDescriptor<String, String> kvId = new MapStateDescriptor<>("id", String.class, String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		MapState<String, String> state = backend.getPartitionedState(
 				VoidNamespace.INSTANCE,
@@ -1142,7 +1582,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				new DummyEnvironment("test", 1, 0));
 
 		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1224,7 +1663,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
-			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1430,7 +1868,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ValueState<IntValue> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1458,7 +1895,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		try {
 			backend.getPartitionedState(null, VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1501,7 +1937,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					Integer.class,
 					-1);
 			desc.setQueryable("my-query");
-			desc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			ValueState<Integer> state = backend.getPartitionedState(
 					VoidNamespace.INSTANCE,
@@ -1524,7 +1959,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			// ListState
 			ListStateDescriptor<Integer> desc = new ListStateDescriptor<>("list-state", Integer.class);
 			desc.setQueryable("my-query");
-			desc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			ListState<Integer> state = backend.getPartitionedState(
 					VoidNamespace.INSTANCE,
@@ -1552,7 +1986,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				}
 			}, Integer.class);
 			desc.setQueryable("my-query");
-			desc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			ReducingState<Integer> state = backend.getPartitionedState(
 					VoidNamespace.INSTANCE,
@@ -1580,7 +2013,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				}
 			}, Integer.class);
 			desc.setQueryable("my-query");
-			desc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			FoldingState<Integer, Integer> state = backend.getPartitionedState(
 					VoidNamespace.INSTANCE,
@@ -1602,7 +2034,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			// MapState
 			MapStateDescriptor<Integer, String> desc = new MapStateDescriptor<>("map-state", Integer.class, String.class);
 			desc.setQueryable("my-query");
-			desc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			MapState<Integer, String> state = backend.getPartitionedState(
 					VoidNamespace.INSTANCE,
@@ -1935,4 +2366,107 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			return KvStateRequestSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer);
 		}
 	}
+
+	private KeyGroupsStateHandle runSnapshot(RunnableFuture<KeyGroupsStateHandle> snapshotRunnableFuture) throws Exception {
+		if(!snapshotRunnableFuture.isDone()) {
+			Thread runner = new Thread(snapshotRunnableFuture);
+			runner.start();
+		}
+		return snapshotRunnableFuture.get();
+	}
+
+	private static class TestPojo implements Serializable {
+		private String strField;
+		private Integer intField;
+
+		public TestPojo() {}
+
+		public TestPojo(String strField, Integer intField) {
+			this.strField = strField;
+			this.intField = intField;
+		}
+
+		public String getStrField() {
+			return strField;
+		}
+
+		public void setStrField(String strField) {
+			this.strField = strField;
+		}
+
+		public Integer getIntField() {
+			return intField;
+		}
+
+		public void setIntField(Integer intField) {
+			this.intField = intField;
+		}
+
+		@Override
+		public String toString() {
+			return "TestPojo{" +
+					"strField='" + strField + '\'' +
+					", intField=" + intField +
+					'}';
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) return true;
+			if (o == null || getClass() != o.getClass()) return false;
+
+			TestPojo testPojo = (TestPojo) o;
+
+			if (!strField.equals(testPojo.strField)) return false;
+			return intField.equals(testPojo.intField);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = strField.hashCode();
+			result = 31 * result + intField.hashCode();
+			return result;
+		}
+	}
+
+	/**
+	 * We throw this in our {@link ExceptionThrowingTestSerializer}.
+	 */
+	private static class ExpectedKryoTestException extends RuntimeException {}
+
+	/**
+	 * Kryo {@code Serializer} that throws an expected exception. We use this to ensure
+	 * that the state backend correctly uses a specified Kryo serializer.
+	 */
+	public static class ExceptionThrowingTestSerializer extends JavaSerializer {
+		@Override
+		public void write(Kryo kryo, Output output, Object object) {
+			throw new ExpectedKryoTestException();
+		}
+
+		@Override
+		public Object read(Kryo kryo, Input input, Class type) {
+			throw new ExpectedKryoTestException();
+		}
+	}
+
+	/**
+	 * Our custom version of {@link JavaSerializer} for checking whether restore with a registered
+	 * serializer works when no serializer was previously registered.
+	 *
+	 * <p>This {@code Serializer} can only be used for writing, not for reading. With this we
+	 * verify that state that was serialized without a registered {@code Serializer} is in fact
+	 * not restored with a {@code Serializer} that was later registered.
+	 */
+	public static class CustomKryoTestSerializer extends JavaSerializer {
+		@Override
+		public void write(Kryo kryo, Output output, Object object) {
+			super.write(kryo, output, object);
+		}
+
+		@Override
+		public Object read(Kryo kryo, Input input, Class type) {
+			throw new ExpectedKryoTestException();
+		}
+	}
 }


[2/2] flink git commit: [FLINK-6018] Add tests for KryoSerializer restore with registered types

Posted by tz...@apache.org.
[FLINK-6018] Add tests for KryoSerializer restore with registered types

This commit also renames isCompatibleWith() to canRestoreFrom() to make
the method asymetric because in the case of KryoSerializer we
can restore from state that was stored using no registed
types/serializers while the other way around is not possible.

This closes #3534.
This closes #3603.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09164cf2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09164cf2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09164cf2

Branch: refs/heads/master
Commit: 09164cf2388888bc2f92f0ca63bb1f15283e895c
Parents: 68289b1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Mar 16 15:17:05 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Mar 24 12:34:03 2017 +0800

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |    2 +-
 .../api/common/typeutils/TypeSerializer.java    |    2 +-
 .../typeutils/runtime/kryo/KryoSerializer.java  |   16 +
 .../AbstractKeyedCEPPatternOperator.java        |    2 +-
 .../state/AbstractKeyedStateBackend.java        |    4 +-
 .../state/DefaultOperatorStateBackend.java      |    6 +-
 .../state/RegisteredBackendStateMetaInfo.java   |    6 +-
 .../state/heap/HeapKeyedStateBackend.java       |    2 +-
 .../runtime/state/StateBackendTestBase.java     | 1346 ++++++++++++------
 9 files changed, 967 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5b72e03..2ce527f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -821,7 +821,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				descriptor.getSerializer());
 
 		if (stateInfo != null) {
-			if (newMetaInfo.isCompatibleWith(stateInfo.f1)) {
+			if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
 				stateInfo.f1 = newMetaInfo;
 				return stateInfo.f0;
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index ac7fbc8..6edaec6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -161,7 +161,7 @@ public abstract class TypeSerializer<T> implements Serializable {
 
 	public abstract int hashCode();
 
-	public boolean isCompatibleWith(TypeSerializer<?> other) {
+	public boolean canRestoreFrom(TypeSerializer<?> other) {
 		return equals(other);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 44c952a..cba0c84 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -383,4 +383,20 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 		checkKryoInitialized();
 		return this.kryo;
 	}
+
+	@Override
+	public boolean canRestoreFrom(TypeSerializer<?> other) {
+		if (other instanceof KryoSerializer) {
+			KryoSerializer<?> otherKryo = (KryoSerializer<?>) other;
+
+			// we cannot include the Serializers here because they don't implement the equals method
+			return other.canEqual(this) &&
+					type == otherKryo.type &&
+					(registeredTypes.equals(otherKryo.registeredTypes) || otherKryo.registeredTypes.isEmpty()) &&
+					(registeredTypesWithSerializerClasses.equals(otherKryo.registeredTypesWithSerializerClasses) || otherKryo.registeredTypesWithSerializerClasses.isEmpty()) &&
+					(defaultSerializerClasses.equals(otherKryo.defaultSerializerClasses) || otherKryo.defaultSerializerClasses.isEmpty());
+		} else {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index b6d57cd..3e18660 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -404,7 +404,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		}
 
 		@Override
-		public boolean isCompatibleWith(TypeSerializer<?> other) {
+		public boolean canRestoreFrom(TypeSerializer<?> other) {
 			return equals(other) || other instanceof AbstractKeyedCEPPatternOperator.PriorityQueueSerializer;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 1f2f4a2..e6e7b23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -277,7 +277,7 @@ public abstract class AbstractKeyedStateBackend<K>
 		}
 
 		if (!stateDescriptor.isSerializerInitialized()) {
-			throw new IllegalStateException("The serializer of the descriptor has not been initialized!"); 
+			stateDescriptor.initializeSerializerUnlessSet(executionConfig);
 		}
 
 		InternalKvState<?> existing = keyValueStatesByName.get(stateDescriptor.getName());
@@ -355,8 +355,6 @@ public abstract class AbstractKeyedStateBackend<K>
 
 		checkNotNull(namespace, "Namespace");
 
-		stateDescriptor.initializeSerializerUnlessSet(executionConfig);
-
 		if (lastName != null && lastName.equals(stateDescriptor.getName())) {
 			lastState.setCurrentNamespace(namespace);
 			return (S) lastState;

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 71cccae..ca7cb48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -134,8 +134,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 					"Incompatible assignment mode. Provided: " + mode + ", expected: " +
 							partitionableListState.getAssignmentMode());
 			Preconditions.checkState(
-					partitionableListState.getPartitionStateSerializer().
-							isCompatibleWith(stateDescriptor.getElementSerializer()),
+					stateDescriptor.getElementSerializer().
+							canRestoreFrom(partitionableListState.getPartitionStateSerializer()),
 					"Incompatible type serializers. Provided: " + stateDescriptor.getElementSerializer() +
 							", found: " + partitionableListState.getPartitionStateSerializer());
 		}
@@ -258,7 +258,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 						registeredStates.put(listState.getName(), listState);
 					} else {
-						Preconditions.checkState(listState.getPartitionStateSerializer().isCompatibleWith(
+						Preconditions.checkState(listState.getPartitionStateSerializer().canRestoreFrom(
 								stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " +
 								listState.getPartitionStateSerializer() + " is not compatible with " +
 								stateMetaInfo.getStateSerializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
index 80bdacd..0d4b3c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
@@ -74,7 +74,7 @@ public class RegisteredBackendStateMetaInfo<N, S> {
 		return stateSerializer;
 	}
 
-	public boolean isCompatibleWith(RegisteredBackendStateMetaInfo<?, ?> other) {
+	public boolean canRestoreFrom(RegisteredBackendStateMetaInfo<?, ?> other) {
 
 		if (this == other) {
 			return true;
@@ -94,8 +94,8 @@ public class RegisteredBackendStateMetaInfo<N, S> {
 			return false;
 		}
 
-		return (stateSerializer.isCompatibleWith(other.stateSerializer)) &&
-				(namespaceSerializer.isCompatibleWith(other.namespaceSerializer)
+		return (stateSerializer.canRestoreFrom(other.stateSerializer)) &&
+				(namespaceSerializer.canRestoreFrom(other.namespaceSerializer)
 						// we also check if there is just a migration proxy that should be replaced by any real serializer
 						|| other.namespaceSerializer instanceof MigrationNamespaceSerializerProxy);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index f3e4ec6..46ec5c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -141,7 +141,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			stateTable = newStateTable(newMetaInfo);
 			stateTables.put(stateName, stateTable);
 		} else {
-			if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
+			if (!newMetaInfo.canRestoreFrom(stateTable.getMetaInfo())) {
 				throw new RuntimeException("Trying to access state using incompatible meta info, was " +
 						stateTable.getMetaInfo() + " trying access with " + newMetaInfo);
 			}