You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:25 UTC

[14/50] [abbrv] flink git commit: [FLINK-6018] Add tests for KryoSerializer restore with registered types

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index faa9314..22bb715 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.state;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import com.google.common.base.Joiner;
 import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -34,11 +36,15 @@ import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -61,9 +67,11 @@ import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -90,6 +98,8 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import org.junit.rules.ExpectedException;
+
 
 /**
  * Generic tests for the partitioned state part of {@link AbstractStateBackend}.
@@ -97,6 +107,9 @@ import static org.mockito.Mockito.verify;
 @SuppressWarnings("serial")
 public abstract class StateBackendTestBase<B extends AbstractStateBackend> extends TestLogger {
 
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
 	protected abstract B getStateBackend() throws Exception;
 
 	protected CheckpointStreamFactory createStreamFactory() throws Exception {
@@ -171,21 +184,478 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 	@Test
 	@SuppressWarnings("unchecked")
+	public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		// cast because our test serializer is not typed to TestPojo
+		env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// we will be expecting ExpectedKryoTestException to be thrown,
+		// because the ExceptionThrowingTestSerializer should be used
+		int numExceptions = 0;
+
+		backend.setCurrentKey(1);
+
+		try {
+			// backends that eagerly serializes (such as RocksDB) will fail here
+			state.update(new TestPojo("u1", 1));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		try {
+			// backends that lazily serializes (such as memory state backend) will fail here
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		// cast because our test serializer is not typed to TestPojo
+		env.getExecutionConfig()
+				.addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		pojoType.createSerializer(env.getExecutionConfig());
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+		assertTrue(state instanceof InternalValueState);
+		((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+		// we will be expecting ExpectedKryoTestException to be thrown,
+		// because the ExceptionThrowingTestSerializer should be used
+		int numExceptions = 0;
+
+		backend.setCurrentKey(1);
+
+		try {
+			// backends that eagerly serializes (such as RocksDB) will fail here
+			state.update(new TestPojo("u1", 1));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		try {
+			// backends that lazily serializes (such as memory state backend) will fail here
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+	}
+
+	@Test
+	public void testBackendUsesRegisteredKryoSerializer() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		env.getExecutionConfig()
+				.registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// we will be expecting ExpectedKryoTestException to be thrown,
+		// because the ExceptionThrowingTestSerializer should be used
+		int numExceptions = 0;
+
+		backend.setCurrentKey(1);
+
+		try {
+			// backends that eagerly serializes (such as RocksDB) will fail here
+			state.update(new TestPojo("u1", 1));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		try {
+			// backends that lazily serializes (such as memory state backend) will fail here
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
+		assertTrue(state instanceof InternalValueState);
+		((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+		// we will be expecting ExpectedKryoTestException to be thrown,
+		// because the ExceptionThrowingTestSerializer should be used
+		int numExceptions = 0;
+
+		backend.setCurrentKey(1);
+
+		try {
+			// backends that eagerly serializes (such as RocksDB) will fail here
+			state.update(new TestPojo("u1", 1));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		try {
+			// backends that lazily serializes (such as memory state backend) will fail here
+			runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		} catch (ExpectedKryoTestException e) {
+			numExceptions++;
+		} catch (Exception e) {
+			if (e.getCause() instanceof ExpectedKryoTestException) {
+				numExceptions++;
+			} else {
+				throw e;
+			}
+		}
+
+		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+	}
+
+
+	/**
+	 * Verify state restore resilience when:
+	 *  - snapshot was taken without any Kryo registrations, specific serializers or default serializers for the state type
+	 *  - restored with the state type registered (no specific serializer)
+	 *
+	 * This test should not fail, because de- / serialization of the state should noth be performed with Kryo's default
+	 * {@link com.esotericsoftware.kryo.serializers.FieldSerializer}.
+	 */
+	@Test
+	public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// ============== create snapshot - no Kryo registration or specific / default serializers ==============
+
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.update(new TestPojo("u1", 1));
+
+		backend.setCurrentKey(2);
+		state.update(new TestPojo("u2", 2));
+
+		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ====================================== restore snapshot  ======================================
+
+		env.getExecutionConfig().registerKryoType(TestPojo.class);
+		
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+		snapshot.discardState();
+
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		backend.setCurrentKey(1);
+		assertEquals(state.value(), new TestPojo("u1", 1));
+
+		backend.setCurrentKey(2);
+		assertEquals(state.value(), new TestPojo("u2", 2));
+
+		backend.dispose();
+	}
+
+	/**
+	 * Verify state restore resilience when:
+	 *  - snapshot was taken without any Kryo registrations, specific serializers or default serializers for the state type
+	 *  - restored with a default serializer for the state type
+	 *
+	 * <p> The default serializer used on restore is {@link CustomKryoTestSerializer}, which deliberately
+	 * fails only on deserialization. We use the deliberate deserialization failure to acknowledge test success.
+	 *
+	 * @throws Exception expects {@link ExpectedKryoTestException} to be thrown.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// ============== create snapshot - no Kryo registration or specific / default serializers ==============
+
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.update(new TestPojo("u1", 1));
+
+		backend.setCurrentKey(2);
+		state.update(new TestPojo("u2", 2));
+
+		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========== restore snapshot - should use default serializer (ONLY SERIALIZATION) ==========
+
+		// cast because our test serializer is not typed to TestPojo
+		env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class);
+
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+		snapshot.discardState();
+
+		// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
+		// initializeSerializerUnlessSet would not pick up our new config
+		kvId = new ValueStateDescriptor<>("id", pojoType);
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+
+		// update to test state backends that eagerly serialize, such as RocksDB
+		state.update(new TestPojo("u1", 11));
+
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========= restore snapshot - should use default serializer (FAIL ON DESERIALIZATION) =========
+
+		// cast because our test serializer is not typed to TestPojo
+		env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class);
+
+		// on the second restore, since the custom serializer will be used for
+		// deserialization, we expect the deliberate failure to be thrown
+		expectedException.expect(ExpectedKryoTestException.class);
+
+		// state backends that eagerly deserializes (such as the memory state backend) will fail here
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
+
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+		// state backends that lazily deserializes (such as RocksDB) will fail here
+		state.value();
+	}
+
+	/**
+	 * Verify state restore resilience when:
+	 *  - snapshot was taken without any Kryo registrations, specific serializers or default serializers for the state type
+	 *  - restored with a specific serializer for the state type
+	 *
+	 * <p> The specific serializer used on restore is {@link CustomKryoTestSerializer}, which deliberately
+	 * fails only on deserialization. We use the deliberate deserialization failure to acknowledge test success.
+	 *
+	 * @throws Exception expects {@link ExpectedKryoTestException} to be thrown.
+	 */
+	@Test
+	public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		Environment env = new DummyEnvironment("test", 1, 0);
+
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+
+		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+
+		// make sure that we are in fact using the KryoSerializer
+		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+
+		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		// ============== create snapshot - no Kryo registration or specific / default serializers ==============
+
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.update(new TestPojo("u1", 1));
+
+		backend.setCurrentKey(2);
+		state.update(new TestPojo("u2", 2));
+
+		KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========== restore snapshot - should use specific serializer (ONLY SERIALIZATION) ==========
+
+		env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
+
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+
+		snapshot.discardState();
+
+		// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
+		// initializeSerializerUnlessSet would not pick up our new config
+		kvId = new ValueStateDescriptor<>("id", pojoType);
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+
+		// update to test state backends that eagerly serialize, such as RocksDB
+		state.update(new TestPojo("u1", 11));
+
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(
+				682375462378L,
+				2,
+				streamFactory,
+				CheckpointOptions.forFullCheckpoint()));
+
+		backend.dispose();
+
+		// ========= restore snapshot - should use specific serializer (FAIL ON DESERIALIZATION) =========
+
+		env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
+
+		// on the second restore, since the custom serializer will be used for
+		// deserialization, we expect the deliberate failure to be thrown
+		expectedException.expect(ExpectedKryoTestException.class);
+
+		// state backends that eagerly deserializes (such as the memory state backend) will fail here
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
+
+		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+		backend.setCurrentKey(1);
+		// state backends that lazily deserializes (such as RocksDB) will fail here
+		state.value();
+	}
+
+
+	@Test
+	@SuppressWarnings("unchecked")
 	public void testValueState() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
 		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-		TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 		@SuppressWarnings("unchecked")
 		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
+		// this is only available after the backend initialized the serializer
+		TypeSerializer<String> valueSerializer = kvId.getSerializer();
+		
 		// some modifications to the state
 		backend.setCurrentKey(1);
 		assertNull(state.value());
@@ -276,16 +746,17 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 		final ValueStateDescriptor<String> kvId =
 			new ValueStateDescriptor<>("id", String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		final TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
 		final TypeSerializer<Integer> namespaceSerializer =
 			IntSerializer.INSTANCE;
-		final TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
 		final ValueState<String> state = backend
 			.getPartitionedState(namespace, IntSerializer.INSTANCE, kvId);
 
+		// this is only available after the backend initialized the serializer
+		final TypeSerializer<String> valueSerializer = kvId.getSerializer();
+
 		@SuppressWarnings("unchecked")
 		final InternalKvState<Integer> kvState = (InternalKvState<Integer>) state;
 
@@ -390,9 +861,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		ValueStateDescriptor<String> desc1 = new ValueStateDescriptor<>("a-string", StringSerializer.INSTANCE);
 		ValueStateDescriptor<Integer> desc2 = new ValueStateDescriptor<>("an-integer", IntSerializer.INSTANCE);
 
-		desc1.initializeSerializerUnlessSet(new ExecutionConfig());
-		desc2.initializeSerializerUnlessSet(new ExecutionConfig());
-
 		ValueState<String> state1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc1);
 		ValueState<Integer> state2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc2);
 
@@ -459,7 +927,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<Long> kvId = new ValueStateDescriptor<>("id", LongSerializer.INSTANCE, 42L);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ValueState<Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -499,463 +966,443 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 	@Test
 	@SuppressWarnings("unchecked,rawtypes")
-	public void testListState() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testListState() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-			ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
-			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+		ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
 
-			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
-			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-			TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
+		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 
-			ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
-			Joiner joiner = Joiner.on(",");
-			// some modifications to the state
-			backend.setCurrentKey(1);
-			assertEquals(null, state.get());
-			assertEquals(null, getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add("1");
-			backend.setCurrentKey(2);
-			assertEquals(null, state.get());
-			assertEquals(null, getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add("2");
-			backend.setCurrentKey(1);
-			assertEquals("1", joiner.join(state.get()));
-			assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		// this is only available after the backend initialized the serializer
+		TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
 
-			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		Joiner joiner = Joiner.on(",");
+		// some modifications to the state
+		backend.setCurrentKey(1);
+		assertEquals(null, state.get());
+		assertEquals(null, getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add("1");
+		backend.setCurrentKey(2);
+		assertEquals(null, state.get());
+		assertEquals(null, getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add("2");
+		backend.setCurrentKey(1);
+		assertEquals("1", joiner.join(state.get()));
+		assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
-			// make some more modifications
-			backend.setCurrentKey(1);
-			state.add("u1");
-			backend.setCurrentKey(2);
-			state.add("u2");
-			backend.setCurrentKey(3);
-			state.add("u3");
+		// draw a snapshot
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.add("u1");
+		backend.setCurrentKey(2);
+		state.add("u2");
+		backend.setCurrentKey(3);
+		state.add("u3");
 
-			// validate the original state
-			backend.setCurrentKey(1);
-			assertEquals("1,u1", joiner.join(state.get()));
-			assertEquals("1,u1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-			backend.setCurrentKey(2);
-			assertEquals("2,u2", joiner.join(state.get()));
-			assertEquals("2,u2", joiner.join(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-			backend.setCurrentKey(3);
-			assertEquals("u3", joiner.join(state.get()));
-			assertEquals("u3", joiner.join(getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		// draw another snapshot
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			backend.dispose();
-			// restore the first snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
-			snapshot1.discardState();
+		// validate the original state
+		backend.setCurrentKey(1);
+		assertEquals("1,u1", joiner.join(state.get()));
+		assertEquals("1,u1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		backend.setCurrentKey(2);
+		assertEquals("2,u2", joiner.join(state.get()));
+		assertEquals("2,u2", joiner.join(getSerializedList(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		backend.setCurrentKey(3);
+		assertEquals("u3", joiner.join(state.get()));
+		assertEquals("u3", joiner.join(getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
-			ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		backend.dispose();
+		// restore the first snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+		snapshot1.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("1", joiner.join(restored1.get()));
-			assertEquals("1", joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-			backend.setCurrentKey(2);
-			assertEquals("2", joiner.join(restored1.get()));
-			assertEquals("2", joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		ListState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
-			backend.dispose();
-			// restore the second snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
-			snapshot2.discardState();
+		backend.setCurrentKey(1);
+		assertEquals("1", joiner.join(restored1.get()));
+		assertEquals("1", joiner.join(getSerializedList(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		backend.setCurrentKey(2);
+		assertEquals("2", joiner.join(restored1.get()));
+		assertEquals("2", joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
-			ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		backend.dispose();
+		// restore the second snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+		snapshot2.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("1,u1", joiner.join(restored2.get()));
-			assertEquals("1,u1", joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-			backend.setCurrentKey(2);
-			assertEquals("2,u2", joiner.join(restored2.get()));
-			assertEquals("2,u2", joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
-			backend.setCurrentKey(3);
-			assertEquals("u3", joiner.join(restored2.get()));
-			assertEquals("u3", joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		ListState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
-			backend.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		backend.setCurrentKey(1);
+		assertEquals("1,u1", joiner.join(restored2.get()));
+		assertEquals("1,u1", joiner.join(getSerializedList(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		backend.setCurrentKey(2);
+		assertEquals("2,u2", joiner.join(restored2.get()));
+		assertEquals("2,u2", joiner.join(getSerializedList(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+		backend.setCurrentKey(3);
+		assertEquals("u3", joiner.join(restored2.get()));
+		assertEquals("u3", joiner.join(getSerializedList(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
+
+		backend.dispose();
 	}
 
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testReducingState() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testReducingState() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-			ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
-			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+		ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
 
-			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
-			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-			TypeSerializer<String> valueSerializer = kvId.getSerializer();
+		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 
-			ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		ReducingState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
-			// some modifications to the state
-			backend.setCurrentKey(1);
-			assertEquals(null, state.get());
-			assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add("1");
-			backend.setCurrentKey(2);
-			assertEquals(null, state.get());
-			assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add("2");
-			backend.setCurrentKey(1);
-			assertEquals("1", state.get());
-			assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		// this is only available after the backend initialized the serializer
+		TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
-			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// some modifications to the state
+		backend.setCurrentKey(1);
+		assertEquals(null, state.get());
+		assertNull(getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add("1");
+		backend.setCurrentKey(2);
+		assertEquals(null, state.get());
+		assertNull(getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add("2");
+		backend.setCurrentKey(1);
+		assertEquals("1", state.get());
+		assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			// make some more modifications
-			backend.setCurrentKey(1);
-			state.add("u1");
-			backend.setCurrentKey(2);
-			state.add("u2");
-			backend.setCurrentKey(3);
-			state.add("u3");
+		// draw a snapshot
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.add("u1");
+		backend.setCurrentKey(2);
+		state.add("u2");
+		backend.setCurrentKey(3);
+		state.add("u3");
 
-			// validate the original state
-			backend.setCurrentKey(1);
-			assertEquals("1,u1", state.get());
-			assertEquals("1,u1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("2,u2", state.get());
-			assertEquals("2,u2", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(3);
-			assertEquals("u3", state.get());
-			assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		// draw another snapshot
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			backend.dispose();
-			// restore the first snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
-			snapshot1.discardState();
+		// validate the original state
+		backend.setCurrentKey(1);
+		assertEquals("1,u1", state.get());
+		assertEquals("1,u1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("2,u2", state.get());
+		assertEquals("2,u2", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(3);
+		assertEquals("u3", state.get());
+		assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		backend.dispose();
+		// restore the first snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+		snapshot1.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("1", restored1.get());
-			assertEquals("1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("2", restored1.get());
-			assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		ReducingState<String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
-			backend.dispose();
-			// restore the second snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
-			snapshot2.discardState();
+		backend.setCurrentKey(1);
+		assertEquals("1", restored1.get());
+		assertEquals("1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("2", restored1.get());
+		assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		backend.dispose();
+		// restore the second snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+		snapshot2.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("1,u1", restored2.get());
-			assertEquals("1,u1", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("2,u2", restored2.get());
-			assertEquals("2,u2", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(3);
-			assertEquals("u3", restored2.get());
-			assertEquals("u3", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		ReducingState<String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
-			backend.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		backend.setCurrentKey(1);
+		assertEquals("1,u1", restored2.get());
+		assertEquals("1,u1", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("2,u2", restored2.get());
+		assertEquals("2,u2", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(3);
+		assertEquals("u3", restored2.get());
+		assertEquals("u3", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+
+		backend.dispose();
 	}
 
 	@Test
 	@SuppressWarnings("unchecked,rawtypes")
-	public void testFoldingState() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testFoldingState() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-			FoldingStateDescriptor<Integer, String> kvId = new FoldingStateDescriptor<>("id",
-					"Fold-Initial:",
-					new AppendingFold(),
-					String.class);
-			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+		FoldingStateDescriptor<Integer, String> kvId = new FoldingStateDescriptor<>("id",
+				"Fold-Initial:",
+				new AppendingFold(),
+				String.class);
 
-			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
-			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-			TypeSerializer<String> valueSerializer = kvId.getSerializer();
+		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 
-			FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		FoldingState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
-			// some modifications to the state
-			backend.setCurrentKey(1);
-			assertEquals(null, state.get());
-			assertEquals(null, getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add(1);
-			backend.setCurrentKey(2);
-			assertEquals(null, state.get());
-			assertEquals(null, getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			state.add(2);
-			backend.setCurrentKey(1);
-			assertEquals("Fold-Initial:,1", state.get());
-			assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		// this is only available after the backend initialized the serializer
+		TypeSerializer<String> valueSerializer = kvId.getSerializer();
 
-			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// some modifications to the state
+		backend.setCurrentKey(1);
+		assertEquals(null, state.get());
+		assertEquals(null, getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add(1);
+		backend.setCurrentKey(2);
+		assertEquals(null, state.get());
+		assertEquals(null, getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		state.add(2);
+		backend.setCurrentKey(1);
+		assertEquals("Fold-Initial:,1", state.get());
+		assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			// make some more modifications
-			backend.setCurrentKey(1);
-			state.clear();
-			state.add(101);
-			backend.setCurrentKey(2);
-			state.add(102);
-			backend.setCurrentKey(3);
-			state.add(103);
+		// draw a snapshot
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.clear();
+		state.add(101);
+		backend.setCurrentKey(2);
+		state.add(102);
+		backend.setCurrentKey(3);
+		state.add(103);
 
-			// validate the original state
-			backend.setCurrentKey(1);
-			assertEquals("Fold-Initial:,101", state.get());
-			assertEquals("Fold-Initial:,101", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("Fold-Initial:,2,102", state.get());
-			assertEquals("Fold-Initial:,2,102", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(3);
-			assertEquals("Fold-Initial:,103", state.get());
-			assertEquals("Fold-Initial:,103", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		// draw another snapshot
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			backend.dispose();
-			// restore the first snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
-			snapshot1.discardState();
+		// validate the original state
+		backend.setCurrentKey(1);
+		assertEquals("Fold-Initial:,101", state.get());
+		assertEquals("Fold-Initial:,101", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("Fold-Initial:,2,102", state.get());
+		assertEquals("Fold-Initial:,2,102", getSerializedValue(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(3);
+		assertEquals("Fold-Initial:,103", state.get());
+		assertEquals("Fold-Initial:,103", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		backend.dispose();
+		// restore the first snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+		snapshot1.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("Fold-Initial:,1", restored1.get());
-			assertEquals("Fold-Initial:,1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("Fold-Initial:,2", restored1.get());
-			assertEquals("Fold-Initial:,2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		FoldingState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
-			backend.dispose();
-			// restore the second snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
-			snapshot1.discardState();
+		backend.setCurrentKey(1);
+		assertEquals("Fold-Initial:,1", restored1.get());
+		assertEquals("Fold-Initial:,1", getSerializedValue(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("Fold-Initial:,2", restored1.get());
+		assertEquals("Fold-Initial:,2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-			@SuppressWarnings("unchecked")
-			FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		backend.dispose();
+		// restore the second snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+		snapshot1.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("Fold-Initial:,101", restored2.get());
-			assertEquals("Fold-Initial:,101", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("Fold-Initial:,2,102", restored2.get());
-			assertEquals("Fold-Initial:,2,102", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
-			backend.setCurrentKey(3);
-			assertEquals("Fold-Initial:,103", restored2.get());
-			assertEquals("Fold-Initial:,103", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		@SuppressWarnings("unchecked")
+		FoldingState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
-			backend.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		backend.setCurrentKey(1);
+		assertEquals("Fold-Initial:,101", restored2.get());
+		assertEquals("Fold-Initial:,101", getSerializedValue(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("Fold-Initial:,2,102", restored2.get());
+		assertEquals("Fold-Initial:,2,102", getSerializedValue(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+		backend.setCurrentKey(3);
+		assertEquals("Fold-Initial:,103", restored2.get());
+		assertEquals("Fold-Initial:,103", getSerializedValue(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
+
+		backend.dispose();
 	}
 
 	@Test
 	@SuppressWarnings("unchecked,rawtypes")
-	public void testMapState() {
-		try {
-			CheckpointStreamFactory streamFactory = createStreamFactory();
-			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+	public void testMapState() throws Exception {
+		CheckpointStreamFactory streamFactory = createStreamFactory();
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
-			MapStateDescriptor<Integer, String> kvId = new MapStateDescriptor<>("id", Integer.class, String.class);
-			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+		MapStateDescriptor<Integer, String> kvId = new MapStateDescriptor<>("id", Integer.class, String.class);
 
-			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
-			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-			TypeSerializer<Integer> userKeySerializer = kvId.getKeySerializer();
-			TypeSerializer<String> userValueSerializer = kvId.getValueSerializer();
+		TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
+		TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 
-			MapState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
+		MapState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> kvState = (InternalKvState<VoidNamespace>) state;
 
-			// some modifications to the state
-			backend.setCurrentKey(1);
-			assertEquals(null, state.get(1));
-			assertEquals(null, getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			state.put(1, "1");
-			backend.setCurrentKey(2);
-			assertEquals(null, state.get(2));
-			assertEquals(null, getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			state.put(2, "2");
-			backend.setCurrentKey(1);
-			assertTrue(state.contains(1));
-			assertEquals("1", state.get(1));
-			assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
-					getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		// these are only available after the backend initialized the serializer
+		TypeSerializer<Integer> userKeySerializer = kvId.getKeySerializer();
+		TypeSerializer<String> userValueSerializer = kvId.getValueSerializer();
 
-			// draw a snapshot
-			KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// some modifications to the state
+		backend.setCurrentKey(1);
+		assertEquals(null, state.get(1));
+		assertEquals(null, getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		state.put(1, "1");
+		backend.setCurrentKey(2);
+		assertEquals(null, state.get(2));
+		assertEquals(null, getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		state.put(2, "2");
+		backend.setCurrentKey(1);
+		assertTrue(state.contains(1));
+		assertEquals("1", state.get(1));
+		assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
+				getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
-			// make some more modifications
-			backend.setCurrentKey(1);
-			state.put(1, "101");
-			backend.setCurrentKey(2);
-			state.put(102, "102");
-			backend.setCurrentKey(3);
-			state.put(103, "103");
-			state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
+		// draw a snapshot
+		KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			// draw another snapshot
-			KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.put(1, "101");
+		backend.setCurrentKey(2);
+		state.put(102, "102");
+		backend.setCurrentKey(3);
+		state.put(103, "103");
+		state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }});
 
-			// validate the original state
-			backend.setCurrentKey(1);
-			assertEquals("101", state.get(1));
-			assertEquals(new HashMap<Integer, String>() {{ put(1, "101"); }},
-					getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("102", state.get(102));
-			assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }},
-					getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			backend.setCurrentKey(3);
-			assertTrue(state.contains(103));
-			assertEquals("103", state.get(103));
-			assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
-					getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-
-			List<Integer> keys = new ArrayList<>();
-			for (Integer key : state.keys()) {
-				keys.add(key);
-			}
-			List<Integer> expectedKeys = new ArrayList<Integer>() {{ add(103); add(1031); add(1032); }};
-			assertEquals(keys.size(), expectedKeys.size());
-			keys.removeAll(expectedKeys);
-			assertTrue(keys.isEmpty());
-
-			List<String> values = new ArrayList<>();
-			for (String value : state.values()) {
-				values.add(value);
-			}
-			List<String> expectedValues = new ArrayList<String>() {{ add("103"); add("1031"); add("1032"); }};
-			assertEquals(values.size(), expectedValues.size());
-			values.removeAll(expectedValues);
-			assertTrue(values.isEmpty());
+		// draw another snapshot
+		KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint()));
 
-			// make some more modifications
-			backend.setCurrentKey(1);
-			state.clear();
-			backend.setCurrentKey(2);
-			state.remove(102);
-			backend.setCurrentKey(3);
-			final String updateSuffix = "_updated";
-			Iterator<Map.Entry<Integer, String>> iterator = state.iterator();
-			while (iterator.hasNext()) {
-				Map.Entry<Integer, String> entry = iterator.next();
-				if (entry.getValue().length() != 4) {
-					iterator.remove();
-				} else {
-					entry.setValue(entry.getValue() + updateSuffix);
-				}
-			}
+		// validate the original state
+		backend.setCurrentKey(1);
+		assertEquals("101", state.get(1));
+		assertEquals(new HashMap<Integer, String>() {{ put(1, "101"); }},
+				getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("102", state.get(102));
+		assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }},
+				getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey(3);
+		assertTrue(state.contains(103));
+		assertEquals("103", state.get(103));
+		assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
+				getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+
+		List<Integer> keys = new ArrayList<>();
+		for (Integer key : state.keys()) {
+			keys.add(key);
+		}
+		List<Integer> expectedKeys = new ArrayList<Integer>() {{ add(103); add(1031); add(1032); }};
+		assertEquals(keys.size(), expectedKeys.size());
+		keys.removeAll(expectedKeys);
+		assertTrue(keys.isEmpty());
+
+		List<String> values = new ArrayList<>();
+		for (String value : state.values()) {
+			values.add(value);
+		}
+		List<String> expectedValues = new ArrayList<String>() {{ add("103"); add("1031"); add("1032"); }};
+		assertEquals(values.size(), expectedValues.size());
+		values.removeAll(expectedValues);
+		assertTrue(values.isEmpty());
 
-			// validate the state
-			backend.setCurrentKey(1);
-			backend.setCurrentKey(2);
-			assertFalse(state.contains(102));
-			backend.setCurrentKey(3);
-			for (Map.Entry<Integer, String> entry : state.entries()) {
-				assertEquals(4 + updateSuffix.length(), entry.getValue().length());
-				assertTrue(entry.getValue().endsWith(updateSuffix));
+		// make some more modifications
+		backend.setCurrentKey(1);
+		state.clear();
+		backend.setCurrentKey(2);
+		state.remove(102);
+		backend.setCurrentKey(3);
+		final String updateSuffix = "_updated";
+		Iterator<Map.Entry<Integer, String>> iterator = state.iterator();
+		while (iterator.hasNext()) {
+			Map.Entry<Integer, String> entry = iterator.next();
+			if (entry.getValue().length() != 4) {
+				iterator.remove();
+			} else {
+				entry.setValue(entry.getValue() + updateSuffix);
 			}
+		}
 
-			backend.dispose();
-			// restore the first snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
-			snapshot1.discardState();
+		// validate the state
+		backend.setCurrentKey(1);
+		backend.setCurrentKey(2);
+		assertFalse(state.contains(102));
+		backend.setCurrentKey(3);
+		for (Map.Entry<Integer, String> entry : state.entries()) {
+			assertEquals(4 + updateSuffix.length(), entry.getValue().length());
+			assertTrue(entry.getValue().endsWith(updateSuffix));
+		}
 
-			MapState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
+		backend.dispose();
+		// restore the first snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1);
+		snapshot1.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("1", restored1.get(1));
-			assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
-					getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("2", restored1.get(2));
-			assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }},
-					getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		MapState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState1 = (InternalKvState<VoidNamespace>) restored1;
 
-			backend.dispose();
-			// restore the second snapshot and validate it
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
-			snapshot2.discardState();
+		backend.setCurrentKey(1);
+		assertEquals("1", restored1.get(1));
+		assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }},
+				getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("2", restored1.get(2));
+		assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }},
+				getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
-			@SuppressWarnings("unchecked")
-			MapState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-			@SuppressWarnings("unchecked")
-			InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
+		backend.dispose();
+		// restore the second snapshot and validate it
+		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2);
+		snapshot2.discardState();
 
-			backend.setCurrentKey(1);
-			assertEquals("101", restored2.get(1));
-			assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }},
-					getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			backend.setCurrentKey(2);
-			assertEquals("102", restored2.get(102));
-			assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }},
-					getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
-			backend.setCurrentKey(3);
-			assertEquals("103", restored2.get(103));
-			assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
-					getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		@SuppressWarnings("unchecked")
+		MapState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+		@SuppressWarnings("unchecked")
+		InternalKvState<VoidNamespace> restoredKvState2 = (InternalKvState<VoidNamespace>) restored2;
 
-			backend.dispose();
-		} catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		backend.setCurrentKey(1);
+		assertEquals("101", restored2.get(1));
+		assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }},
+				getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey(2);
+		assertEquals("102", restored2.get(102));
+		assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }},
+				getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
+		backend.setCurrentKey(3);
+		assertEquals("103", restored2.get(103));
+		assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }},
+				getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer));
 
+		backend.dispose();
 	}
 
 	/**
@@ -966,7 +1413,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -991,7 +1437,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, "Hello");
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1015,7 +1460,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ReducingState<String> state = backend.getPartitionedState(
 				VoidNamespace.INSTANCE,
@@ -1043,8 +1487,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		FoldingStateDescriptor<Integer, String> kvId =
 				new FoldingStateDescriptor<>("id", "Fold-Initial:", new AppendingFold(), String.class);
 
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
-
 		FoldingState<Integer, String> state = backend.getPartitionedState(
 				VoidNamespace.INSTANCE,
 				VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1071,7 +1513,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ListState<String> state = backend.getPartitionedState(
 				VoidNamespace.INSTANCE,
@@ -1098,7 +1539,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		MapStateDescriptor<String, String> kvId = new MapStateDescriptor<>("id", String.class, String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		MapState<String, String> state = backend.getPartitionedState(
 				VoidNamespace.INSTANCE,
@@ -1142,7 +1582,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				new DummyEnvironment("test", 1, 0));
 
 		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1224,7 +1663,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
-			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1430,7 +1868,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		ValueState<IntValue> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
@@ -1458,7 +1895,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
 		try {
 			backend.getPartitionedState(null, VoidNamespaceSerializer.INSTANCE, kvId);
@@ -1501,7 +1937,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 					Integer.class,
 					-1);
 			desc.setQueryable("my-query");
-			desc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			ValueState<Integer> state = backend.getPartitionedState(
 					VoidNamespace.INSTANCE,
@@ -1524,7 +1959,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			// ListState
 			ListStateDescriptor<Integer> desc = new ListStateDescriptor<>("list-state", Integer.class);
 			desc.setQueryable("my-query");
-			desc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			ListState<Integer> state = backend.getPartitionedState(
 					VoidNamespace.INSTANCE,
@@ -1552,7 +1986,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				}
 			}, Integer.class);
 			desc.setQueryable("my-query");
-			desc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			ReducingState<Integer> state = backend.getPartitionedState(
 					VoidNamespace.INSTANCE,
@@ -1580,7 +2013,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				}
 			}, Integer.class);
 			desc.setQueryable("my-query");
-			desc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			FoldingState<Integer, Integer> state = backend.getPartitionedState(
 					VoidNamespace.INSTANCE,
@@ -1602,7 +2034,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			// MapState
 			MapStateDescriptor<Integer, String> desc = new MapStateDescriptor<>("map-state", Integer.class, String.class);
 			desc.setQueryable("my-query");
-			desc.initializeSerializerUnlessSet(new ExecutionConfig());
 
 			MapState<Integer, String> state = backend.getPartitionedState(
 					VoidNamespace.INSTANCE,
@@ -1935,4 +2366,107 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			return KvStateRequestSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer);
 		}
 	}
+
+	private KeyGroupsStateHandle runSnapshot(RunnableFuture<KeyGroupsStateHandle> snapshotRunnableFuture) throws Exception {
+		if(!snapshotRunnableFuture.isDone()) {
+			Thread runner = new Thread(snapshotRunnableFuture);
+			runner.start();
+		}
+		return snapshotRunnableFuture.get();
+	}
+
+	private static class TestPojo implements Serializable {
+		private String strField;
+		private Integer intField;
+
+		public TestPojo() {}
+
+		public TestPojo(String strField, Integer intField) {
+			this.strField = strField;
+			this.intField = intField;
+		}
+
+		public String getStrField() {
+			return strField;
+		}
+
+		public void setStrField(String strField) {
+			this.strField = strField;
+		}
+
+		public Integer getIntField() {
+			return intField;
+		}
+
+		public void setIntField(Integer intField) {
+			this.intField = intField;
+		}
+
+		@Override
+		public String toString() {
+			return "TestPojo{" +
+					"strField='" + strField + '\'' +
+					", intField=" + intField +
+					'}';
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) return true;
+			if (o == null || getClass() != o.getClass()) return false;
+
+			TestPojo testPojo = (TestPojo) o;
+
+			if (!strField.equals(testPojo.strField)) return false;
+			return intField.equals(testPojo.intField);
+		}
+
+		@Override
+		public int hashCode() {
+			int result = strField.hashCode();
+			result = 31 * result + intField.hashCode();
+			return result;
+		}
+	}
+
+	/**
+	 * We throw this in our {@link ExceptionThrowingTestSerializer}.
+	 */
+	private static class ExpectedKryoTestException extends RuntimeException {}
+
+	/**
+	 * Kryo {@code Serializer} that throws an expected exception. We use this to ensure
+	 * that the state backend correctly uses a specified Kryo serializer.
+	 */
+	public static class ExceptionThrowingTestSerializer extends JavaSerializer {
+		@Override
+		public void write(Kryo kryo, Output output, Object object) {
+			throw new ExpectedKryoTestException();
+		}
+
+		@Override
+		public Object read(Kryo kryo, Input input, Class type) {
+			throw new ExpectedKryoTestException();
+		}
+	}
+
+	/**
+	 * Our custom version of {@link JavaSerializer} for checking whether restore with a registered
+	 * serializer works when no serializer was previously registered.
+	 *
+	 * <p>This {@code Serializer} can only be used for writing, not for reading. With this we
+	 * verify that state that was serialized without a registered {@code Serializer} is in fact
+	 * not restored with a {@code Serializer} that was later registered.
+	 */
+	public static class CustomKryoTestSerializer extends JavaSerializer {
+		@Override
+		public void write(Kryo kryo, Output output, Object object) {
+			super.write(kryo, output, object);
+		}
+
+		@Override
+		public Object read(Kryo kryo, Input input, Class type) {
+			throw new ExpectedKryoTestException();
+		}
+	}
 }