You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/23 09:22:48 UTC

[flink] 03/05: [hotfix] [tests] Remove redundant outdated migration tests in StateBackendTestBase

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef4784835b2f3805dcfe3d4ef0abf5cbf83e9ba1
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Jan 18 14:02:13 2019 +0100

    [hotfix] [tests] Remove redundant outdated migration tests in StateBackendTestBase
    
    Those tests were implemented before we had the state schema evolution
    feature in Flink 1.7, and were implemented using the old legacy
    serialization compatibility APIs. Moreoever, the behaviours that they
    test are already more comprehensively covered in
    StateBackendMigrationTestBase.
---
 .../runtime/state/MemoryStateBackendTest.java      | 140 ------
 .../runtime/state/OperatorStateBackendTest.java    |  59 ---
 .../flink/runtime/state/StateBackendTestBase.java  | 473 ---------------------
 3 files changed, 672 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 3b157ba..2ac1ea1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -18,34 +18,11 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.StateObjectCollection;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
-import org.apache.flink.util.ExceptionUtils;
 
-import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.concurrent.RunnableFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
 /**
  * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}.
  */
@@ -82,123 +59,6 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 	@Test
 	public void testMapStateRestoreWithWrongSerializers() {}
 
-
-
-	/**
-	 * Verifies that the operator state backend fails with appropriate error and message if
-	 * previous serializer can not be restored.
-	 */
-	@Test
-	public void testOperatorStateRestoreFailsIfSerializerDeserializationFails() throws Exception {
-		DummyEnvironment env = new DummyEnvironment();
-		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
-
-		OperatorStateBackend operatorStateBackend =
-			abstractStateBackend.createOperatorStateBackend(env, "test-op-name");
-
-		// write some state
-		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
-		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
-		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
-		ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1);
-		ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2);
-		ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
-
-		listState1.add(42);
-		listState1.add(4711);
-
-		listState2.add(7);
-		listState2.add(13);
-		listState2.add(23);
-
-		listState3.add(17);
-		listState3.add(18);
-		listState3.add(19);
-		listState3.add(20);
-
-		CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
-
-		RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
-			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-
-		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(runnableFuture);
-		OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
-
-		try {
-
-			operatorStateBackend.close();
-			operatorStateBackend.dispose();
-
-			env = new DummyEnvironment(
-				new ArtificialCNFExceptionThrowingClassLoader(
-					getClass().getClassLoader(),
-					Collections.singleton(JavaSerializer.class.getName())));
-
-			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
-				env,
-				"testOperator");
-
-			operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
-
-			fail("The operator state restore should have failed if the previous state serializer could not be loaded.");
-		} catch (Exception expected) {
-			Assert.assertTrue(ExceptionUtils.findThrowable(expected, ClassNotFoundException.class).isPresent());
-		} finally {
-			stateHandle.discardState();
-		}
-	}
-
-	/**
-	 * Verifies that memory-backed keyed state backend fails with appropriate error and message if
-	 * previous serializer can not be restored.
-	 */
-	@Test
-	public void testKeyedStateRestoreFailsIfSerializerDeserializationFails() throws Exception {
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-		KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
-
-		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		HeapKeyedStateBackend<Integer> heapBackend = (HeapKeyedStateBackend<Integer>) backend;
-
-		assertEquals(0, heapBackend.numKeyValueStateEntries());
-
-		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-		// write some state
-		backend.setCurrentKey(0);
-		state.update("hello");
-		state.update("ciao");
-
-		KeyedStateHandle snapshot = runSnapshot(
-			((HeapKeyedStateBackend<Integer>) backend).snapshot(
-				682375462378L,
-				2,
-				streamFactory,
-				CheckpointOptions.forCheckpointWithDefaultLocation()),
-			sharedStateRegistry);
-
-		backend.dispose();
-
-		// ========== restore snapshot ==========
-
-		try {
-			restoreKeyedBackend(
-				IntSerializer.INSTANCE,
-				snapshot,
-				new DummyEnvironment(
-					new ArtificialCNFExceptionThrowingClassLoader(
-						getClass().getClassLoader(),
-						Collections.singleton(StringSerializer.class.getName()))));
-
-			fail("The keyed state restore should have failed if the previous state serializer could not be loaded.");
-		} catch (Exception expected) {
-			Assert.assertTrue(ExceptionUtils.findThrowable(expected, ClassNotFoundException.class).isPresent());
-		}
-	}
-
 	@Ignore
 	@Test
 	public void testConcurrentMapIfQueryable() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 178671b..3918303 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -36,14 +36,11 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
-import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.junit.Assert;
@@ -52,7 +49,6 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -843,61 +839,6 @@ public class OperatorStateBackendTest {
 		}
 	}
 
-	@Test
-	public void testRestoreFailsIfSerializerDeserializationFails() throws Exception {
-		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
-
-		OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name");
-
-		// write some state
-		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
-		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
-		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
-		ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1);
-		ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2);
-		ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
-
-		listState1.add(42);
-		listState1.add(4711);
-
-		listState2.add(7);
-		listState2.add(13);
-		listState2.add(23);
-
-		listState3.add(17);
-		listState3.add(18);
-		listState3.add(19);
-		listState3.add(20);
-
-		CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
-		RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
-			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-
-		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(runnableFuture);
-		OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
-
-		try {
-
-			operatorStateBackend.close();
-			operatorStateBackend.dispose();
-
-			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
-				new DummyEnvironment(
-					new ArtificialCNFExceptionThrowingClassLoader(
-						getClass().getClassLoader(),
-						Collections.singleton(JavaSerializer.class.getName()))),
-				"testOperator");
-
-			operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
-
-			fail("The operator state restore should have failed if the previous state serializer could not be loaded.");
-		} catch (Exception expected) {
-			Assert.assertTrue(ExceptionUtils.findThrowable(expected, ClassNotFoundException.class).isPresent());
-		} finally {
-			stateHandle.discardState();
-		}
-	}
-
 	static final class MutableType implements Serializable {
 
 		private static final long serialVersionUID = 1L;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index f51ab31..f1269fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -38,10 +38,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -52,11 +49,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -80,9 +72,7 @@ import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
-import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
@@ -93,12 +83,10 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -107,7 +95,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.PrimitiveIterator;
 import java.util.Random;
 import java.util.Timer;
@@ -970,282 +957,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
-	public void testStateSerializerReconfiguration() throws Exception {
-
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-		Environment env = new DummyEnvironment();
-
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
-
-		try {
-			ValueStateDescriptor<TestCustomStateClass> kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
-			ValueState<TestCustomStateClass> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-			// ============== create snapshot, using the non-reconfigured serializer ==============
-
-			// make some modifications
-			backend.setCurrentKey(1);
-			state.update(new TestCustomStateClass("test-message-1", "this-should-be-ignored"));
-
-			backend.setCurrentKey(2);
-			state.update(new TestCustomStateClass("test-message-2", "this-should-be-ignored"));
-
-			// verify that our assumption that the serializer is not yet reconfigured;
-			// we cast the state handle to the internal representation in order to retrieve the serializer
-			InternalKvState internal = (InternalKvState) state;
-			assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
-			assertFalse(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
-
-			KeyedStateHandle snapshot1 = runSnapshot(
-				backend.snapshot(
-					682375462378L,
-					2,
-					streamFactory,
-					CheckpointOptions.forCheckpointWithDefaultLocation()),
-				sharedStateRegistry);
-
-			backend.dispose();
-
-			// ========== restore snapshot, which should reconfigure the serializer, and then create a snapshot again ==========
-
-			env = new DummyEnvironment();
-
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1, env);
-
-			kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
-			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-			// verify that the serializer used is correctly reconfigured
-			internal = (InternalKvState) state;
-			assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
-			assertTrue(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
-
-			backend.setCurrentKey(1);
-			TestCustomStateClass restoredState1 = state.value();
-			assertEquals("test-message-1", restoredState1.getMessage());
-			// the previous serializer schema does not contain the extra message
-			assertNull(restoredState1.getExtraMessage());
-
-			state.update(new TestCustomStateClass("new-test-message-1", "extra-message-1"));
-
-			backend.setCurrentKey(2);
-			TestCustomStateClass restoredState2 = state.value();
-			assertEquals("test-message-2", restoredState2.getMessage());
-			assertNull(restoredState1.getExtraMessage());
-
-			state.update(new TestCustomStateClass("new-test-message-2", "extra-message-2"));
-
-			KeyedStateHandle snapshot2 = runSnapshot(
-				backend.snapshot(
-					682375462379L,
-					3,
-					streamFactory,
-					CheckpointOptions.forCheckpointWithDefaultLocation()),
-				sharedStateRegistry);
-
-			snapshot1.discardState();
-			backend.dispose();
-
-			// ========== restore snapshot again; state should now be in the new schema containing the extra message ==========
-
-			env = new DummyEnvironment();
-
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
-
-			snapshot2.discardState();
-
-			kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
-			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-			internal = (InternalKvState) state;
-			assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
-			assertTrue(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
-
-			backend.setCurrentKey(1);
-			restoredState1 = state.value();
-			assertEquals("new-test-message-1", restoredState1.getMessage());
-			assertEquals("extra-message-1", restoredState1.getExtraMessage());
-
-			backend.setCurrentKey(2);
-			restoredState2 = state.value();
-			assertEquals("new-test-message-2", restoredState2.getMessage());
-			assertEquals("extra-message-2", restoredState2.getExtraMessage());
-		} finally {
-			backend.dispose();
-		}
-	}
-
-	@Test
-	public void testSerializerPresenceOnRestore() throws Exception {
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-		Environment env = new DummyEnvironment();
-
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
-
-		try {
-			ValueStateDescriptor<TestCustomStateClass> kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializerPreUpgrade());
-			ValueState<TestCustomStateClass> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-			// ============== create snapshot, using the old serializer ==============
-
-			// make some modifications
-			backend.setCurrentKey(1);
-			state.update(new TestCustomStateClass("test-message-1", "this-should-be-ignored"));
-
-			backend.setCurrentKey(2);
-			state.update(new TestCustomStateClass("test-message-2", "this-should-be-ignored"));
-
-			KeyedStateHandle snapshot1 = runSnapshot(
-				backend.snapshot(
-					682375462378L,
-					2,
-					streamFactory,
-					CheckpointOptions.forCheckpointWithDefaultLocation()),
-				sharedStateRegistry);
-
-			backend.dispose();
-
-			// ========== restore snapshot, using the new serializer (that has different classname) ==========
-
-			// on restore, simulate that the previous serializer class is no longer in the classloader
-			env = new DummyEnvironment(
-				new ArtificialCNFExceptionThrowingClassLoader(
-					getClass().getClassLoader(),
-					Collections.singleton(TestReconfigurableCustomTypeSerializerPreUpgrade.class.getName())));
-
-			try {
-				backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1, env);
-			} catch (IOException e) {
-				if (!isSerializerPresenceRequiredOnRestore()) {
-					fail("Presence of old serializer should not have been required.");
-				} else {
-					// test success
-					return;
-				}
-			}
-
-			// if serializer presence is not required, continue on to modify some state to make sure that everything works correctly
-			kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializerUpgraded());
-			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-			backend.setCurrentKey(1);
-			state.update(new TestCustomStateClass("new-test-message-1", "extra-message-1"));
-
-			backend.setCurrentKey(2);
-			state.update(new TestCustomStateClass("new-test-message-2", "extra-message-2"));
-
-			KeyedStateHandle snapshot2 = runSnapshot(
-				backend.snapshot(
-					682375462379L,
-					3,
-					streamFactory,
-					CheckpointOptions.forCheckpointWithDefaultLocation()),
-				sharedStateRegistry);
-
-			snapshot1.discardState();
-		} finally {
-			backend.dispose();
-		}
-	}
-
-	@Test
-	public void testPriorityQueueSerializerUpdates() throws Exception {
-
-		final String stateName = "test";
-		final CheckpointStreamFactory streamFactory = createStreamFactory();
-		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-
-		AbstractKeyedStateBackend<Integer> keyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
-
-		try {
-			TypeSerializer<InternalPriorityQueueTestBase.TestElement> serializer =
-				InternalPriorityQueueTestBase.TestElementSerializer.INSTANCE;
-
-			KeyGroupedInternalPriorityQueue<InternalPriorityQueueTestBase.TestElement> priorityQueue =
-				keyedBackend.create(stateName, serializer);
-
-			priorityQueue.add(new InternalPriorityQueueTestBase.TestElement(42L, 0L));
-
-			RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
-				keyedBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-
-			KeyedStateHandle keyedStateHandle = runSnapshot(snapshot, sharedStateRegistry);
-
-			keyedBackend.dispose();
-
-			// test restore with a modified but compatible serializer ---------------------------
-
-			keyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, keyedStateHandle);
-
-			serializer = new ModifiedTestElementSerializer();
-
-			priorityQueue = keyedBackend.create(stateName, serializer);
-
-			final InternalPriorityQueueTestBase.TestElement checkElement =
-				new InternalPriorityQueueTestBase.TestElement(4711L, 1L);
-			priorityQueue.add(checkElement);
-
-			snapshot = keyedBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-
-			keyedStateHandle = runSnapshot(snapshot, sharedStateRegistry);
-
-			keyedBackend.dispose();
-
-			// test that the modified serializer was actually used ---------------------------
-
-			keyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, keyedStateHandle);
-			priorityQueue = keyedBackend.create(stateName, serializer);
-
-			priorityQueue.poll();
-
-			ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos();
-			DataOutputViewStreamWrapper outWrapper = new DataOutputViewStreamWrapper(out);
-			serializer.serialize(checkElement, outWrapper);
-			InternalPriorityQueueTestBase.TestElement expected =
-				serializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(out.toByteArray())));
-
-			Assert.assertEquals(
-				expected,
-				priorityQueue.poll());
-			Assert.assertTrue(priorityQueue.isEmpty());
-
-			keyedBackend.dispose();
-
-			// test that incompatible serializer is rejected ---------------------------
-
-			serializer = InternalPriorityQueueTestBase.TestElementSerializer.INSTANCE;
-			keyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, keyedStateHandle);
-
-			try {
-				// this is expected to fail, because the old and new serializer shoulbe be incompatible through
-				// different revision numbers.
-				keyedBackend.create("test", serializer);
-				Assert.fail("Expected exception from incompatible serializer.");
-			} catch (Exception e) {
-				Assert.assertTrue("Exception was not caused by state migration: " + e,
-					ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
-			}
-		} finally {
-			keyedBackend.dispose();
-		}
-	}
-
-	public static class ModifiedTestElementSerializer extends InternalPriorityQueueTestBase.TestElementSerializer {
-
-		@Override
-		public void serialize(InternalPriorityQueueTestBase.TestElement record, DataOutputView target) throws IOException {
-			super.serialize(new InternalPriorityQueueTestBase.TestElement(record.getKey() + 1, record.getPriority() + 1), target);
-		}
-
-		@Override
-		protected int getRevision() {
-			return super.getRevision() + 1;
-		}
-	}
-
-	@Test
 	@SuppressWarnings("unchecked")
 	public void testValueState() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();
@@ -4573,190 +4284,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	/**
-	 * Custom state class used for testing state serializer schema migration.
-	 * The corresponding serializer used in the tests is {@link TestReconfigurableCustomTypeSerializer}.
-	 */
-	public static class TestCustomStateClass {
-
-		private String message;
-		private String extraMessage;
-
-		public TestCustomStateClass(String message, String extraMessage) {
-			this.message = message;
-			this.extraMessage = extraMessage;
-		}
-
-		public String getMessage() {
-			return message;
-		}
-
-		public void setMessage(String message) {
-			this.message = message;
-		}
-
-		public String getExtraMessage() {
-			return extraMessage;
-		}
-
-		public void setExtraMessage(String extraMessage) {
-			this.extraMessage = extraMessage;
-		}
-	}
-
-	/**
-	 * A reconfigurable serializer that simulates backwards compatible schema evolution for the {@link TestCustomStateClass}.
-	 * A flag is maintained to determine whether or not the serializer has be reconfigured.
-	 * Whether or not it has been reconfigured affects which fields of {@link TestCustomStateClass} instances are
-	 * written and read on serialization.
-	 */
-	public static class TestReconfigurableCustomTypeSerializer extends TypeSerializer<TestCustomStateClass> {
-
-		private boolean reconfigured = false;
-
-		public TestReconfigurableCustomTypeSerializer() {}
-
-		/** Copy constructor. */
-		private TestReconfigurableCustomTypeSerializer(boolean reconfigured) {
-			this.reconfigured = reconfigured;
-		}
-
-		@Override
-		public TypeSerializer<TestCustomStateClass> duplicate() {
-			return new TestReconfigurableCustomTypeSerializer(reconfigured);
-		}
-
-		@Override
-		public TestCustomStateClass createInstance() {
-			return new TestCustomStateClass(null, null);
-		}
-
-		@Override
-		public void serialize(TestCustomStateClass record, DataOutputView target) throws IOException {
-			target.writeBoolean(reconfigured);
-
-			target.writeUTF(record.getMessage());
-			if (reconfigured) {
-				target.writeUTF(record.getExtraMessage());
-			}
-		}
-
-		@Override
-		public TestCustomStateClass deserialize(DataInputView source) throws IOException {
-			boolean isNewSchema = source.readBoolean();
-
-			String message = source.readUTF();
-			if (isNewSchema) {
-				return new TestCustomStateClass(message, source.readUTF());
-			} else {
-				return new TestCustomStateClass(message, null);
-			}
-		}
-
-		@Override
-		public TestCustomStateClass deserialize(TestCustomStateClass reuse, DataInputView source) throws IOException {
-			boolean isNewSchema = source.readBoolean();
-
-			String message = source.readUTF();
-			if (isNewSchema) {
-				reuse.setMessage(message);
-				reuse.setExtraMessage(source.readUTF());
-				return reuse;
-			} else {
-				reuse.setMessage(message);
-				reuse.setExtraMessage(null);
-				return reuse;
-			}
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			boolean reconfigured = source.readBoolean();
-
-			target.writeUTF(source.readUTF());
-			if (reconfigured)
-			target.writeUTF(source.readUTF());
-		}
-
-		@Override
-		public TestCustomStateClass copy(TestCustomStateClass from) {
-			return new TestCustomStateClass(from.getMessage(), from.getExtraMessage());
-		}
-
-		@Override
-		public TestCustomStateClass copy(TestCustomStateClass from, TestCustomStateClass reuse) {
-			reuse.setMessage(from.getMessage());
-			reuse.setExtraMessage(from.getExtraMessage());
-			return reuse;
-		}
-
-		@Override
-		public int getLength() {
-			return 0;
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			return false;
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return obj instanceof TestReconfigurableCustomTypeSerializer;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj == null) {
-				return false;
-			}
-
-			if (!(obj instanceof TestReconfigurableCustomTypeSerializer)) {
-				return false;
-			}
-
-			if (obj == this) {
-				return true;
-			} else {
-				TestReconfigurableCustomTypeSerializer other = (TestReconfigurableCustomTypeSerializer) obj;
-				return other.reconfigured == this.reconfigured;
-			}
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(getClass().getName(), reconfigured);
-		}
-
-		// -- reconfiguration --
-
-		public boolean isReconfigured() {
-			return reconfigured;
-		}
-
-		// -- config snapshot --
-
-		@Override
-		public TypeSerializerConfigSnapshot<TestCustomStateClass> snapshotConfiguration() {
-			return new ParameterlessTypeSerializerConfig<>(getClass().getName());
-		}
-
-		@Override
-		public CompatibilityResult<TestCustomStateClass> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-			if (configSnapshot instanceof ParameterlessTypeSerializerConfig &&
-					((ParameterlessTypeSerializerConfig<?>) configSnapshot).getSerializationFormatIdentifier().equals(getClass().getName())) {
-
-				this.reconfigured = true;
-				return CompatibilityResult.compatible();
-			} else {
-				return CompatibilityResult.requiresMigration();
-			}
-		}
-	}
-
-	public static class TestReconfigurableCustomTypeSerializerPreUpgrade extends TestReconfigurableCustomTypeSerializer {}
-	public static class TestReconfigurableCustomTypeSerializerUpgraded extends TestReconfigurableCustomTypeSerializer {}
-
-	/**
 	 * We throw this in our {@link ExceptionThrowingTestSerializer}.
 	 */
 	private static class ExpectedKryoTestException extends RuntimeException {}