You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/17 11:53:52 UTC

[flink] 01/06: [hotfix] [tests] Remove unfruitful MigrationTestUtil class

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b30adb0ce90d8f81f8028aeb5d541a3a67e6543d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Jan 10 14:58:43 2019 +0100

    [hotfix] [tests] Remove unfruitful MigrationTestUtil class
    
    That utility class had a single helper method, restoreFromSnapshot,
    which accepts the target snapshot's Flink version. This was useful
    before, because the way to restore snapshots was a bit different for
    Flink <= 1.1 and newer versions.
    
    Since we now no longer support compatibility for 1.1 versions and
    below, this helper method is simply forwarding the restore operation
    to the test harness.
    
    This commit refactors this have equivalent behaviour directly in the
    AbstractStreamOperatorTestHarness class.
---
 .../fs/bucketing/BucketingSinkMigrationTest.java   |  7 +---
 .../kafka/FlinkKafkaConsumerBaseMigrationTest.java | 25 ++++--------
 .../kafka/FlinkKafkaConsumerBaseTest.java          |  2 +-
 .../kinesis/FlinkKinesisConsumerMigrationTest.java | 19 +++++-----
 .../kinesis/FlinkKinesisConsumerTest.java          |  2 +-
 .../ContinuousFileProcessingMigrationTest.java     | 13 ++-----
 .../flink/cep/operator/CEPMigrationTest.java       | 28 ++++++--------
 .../windowing/WindowOperatorMigrationTest.java     | 43 +++++++--------------
 .../util/AbstractStreamOperatorTestHarness.java    | 10 ++++-
 .../util/migration/MigrationTestUtil.java          | 44 ----------------------
 10 files changed, 59 insertions(+), 134 deletions(-)

diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
index 44be702..8d865b3 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.util.OperatingSystem;
 
@@ -167,11 +166,9 @@ public class BucketingSinkMigrationTest {
 			new StreamSink<>(sink), 10, 1, 0);
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"bucketing-sink-migration-test-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"bucketing-sink-migration-test-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
index be468a0..fbb3732 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java
@@ -36,7 +36,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
@@ -207,11 +206,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		testHarness.setup();
 
 		// restore state from binary snapshot file
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"),
-			testMigrateVersion);
+				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"));
 
 		testHarness.open();
 
@@ -248,11 +245,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		testHarness.setup();
 
 		// restore state from binary snapshot file
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"),
-			testMigrateVersion);
+				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-empty-state-snapshot"));
 
 		testHarness.open();
 
@@ -302,11 +297,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 		testHarness.setup();
 
 		// restore state from binary snapshot file
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -349,11 +342,9 @@ public class FlinkKafkaConsumerBaseMigrationTest {
 
 		// restore state from binary snapshot file; should fail since discovery is enabled
 		try {
-			MigrationTestUtil.restoreFromSnapshot(
-				testHarness,
+			testHarness.initializeState(
 				OperatorSnapshotUtil.getResourceFilename(
-					"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"),
-				testMigrateVersion);
+					"kafka-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
 
 			fail("Restore from savepoints from version before Flink 1.3.x should have failed if discovery is enabled.");
 		} catch (Exception e) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index dbe7630..40bb580 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -515,7 +515,7 @@ public class FlinkKafkaConsumerBaseTest {
 			testHarnesses[i] = createTestHarness(consumers[i], initialParallelism, i);
 
 			// initializeState() is always called, null signals that we didn't restore
-			testHarnesses[i].initializeState(null);
+			testHarnesses[i].initializeEmptyState();
 			testHarnesses[i].open();
 		}
 
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
index e21a880..f36b661 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 
 import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@@ -148,9 +147,9 @@ public class FlinkKinesisConsumerMigrationTest {
 			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
 
 		testHarness.setup();
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
-			"src/test/resources/kinesis-consumer-migration-test-flink" + testMigrateVersion + "-empty-snapshot", testMigrateVersion);
+		testHarness.initializeState(
+			OperatorSnapshotUtil.getResourceFilename(
+				"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-empty-snapshot"));
 		testHarness.open();
 
 		consumerFunction.run(new TestSourceContext<>());
@@ -204,9 +203,9 @@ public class FlinkKinesisConsumerMigrationTest {
 			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
 
 		testHarness.setup();
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
-			"src/test/resources/kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot", testMigrateVersion);
+		testHarness.initializeState(
+			OperatorSnapshotUtil.getResourceFilename(
+				"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
 		testHarness.open();
 
 		consumerFunction.run(new TestSourceContext<>());
@@ -285,9 +284,9 @@ public class FlinkKinesisConsumerMigrationTest {
 			new AbstractStreamOperatorTestHarness<>(consumerOperator, 1, 1, 0);
 
 		testHarness.setup();
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
-			"src/test/resources/kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot", testMigrateVersion);
+		testHarness.initializeState(
+			OperatorSnapshotUtil.getResourceFilename(
+				"kinesis-consumer-migration-test-flink" + testMigrateVersion + "-snapshot"));
 		testHarness.open();
 
 		consumerFunction.run(new TestSourceContext<>());
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 84e18bd..d36d68a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -763,7 +763,7 @@ public class FlinkKinesisConsumerTest {
 		testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
 		testHarness.getExecutionConfig().setAutoWatermarkInterval(autoWatermarkInterval);
 
-		testHarness.initializeState(null);
+		testHarness.initializeEmptyState();
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Watermark> watermarks = new ConcurrentLinkedQueue<>();
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 7f7e0c5..0a3f75e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.util.OperatingSystem;
 
@@ -176,11 +175,9 @@ public class ContinuousFileProcessingMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"reader-migration-test-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"reader-migration-test-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -307,11 +304,9 @@ public class ContinuousFileProcessingMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"monitoring-function-migration-test-" + expectedModTime + "-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"monitoring-function-migration-test-" + expectedModTime + "-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index 061a3c6..0461bd6 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 
 import org.junit.Ignore;
@@ -158,10 +157,8 @@ public class CEPMigrationTest {
 		try {
 			harness.setup();
 
-			MigrationTestUtil.restoreFromSnapshot(
-				harness,
-				OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink" + migrateVersion + "-snapshot"),
-				migrateVersion);
+			harness.initializeState(
+				OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink" + migrateVersion + "-snapshot"));
 
 			harness.open();
 
@@ -320,10 +317,9 @@ public class CEPMigrationTest {
 		try {
 			harness.setup();
 
-			MigrationTestUtil.restoreFromSnapshot(
-				harness,
-				OperatorSnapshotUtil.getResourceFilename("cep-migration-starting-new-pattern-flink" + migrateVersion + "-snapshot"),
-				migrateVersion);
+			harness.initializeState(
+				OperatorSnapshotUtil.getResourceFilename(
+					"cep-migration-starting-new-pattern-flink" + migrateVersion + "-snapshot"));
 
 			harness.open();
 
@@ -486,10 +482,9 @@ public class CEPMigrationTest {
 		try {
 			harness.setup();
 
-			MigrationTestUtil.restoreFromSnapshot(
-				harness,
-				OperatorSnapshotUtil.getResourceFilename("cep-migration-single-pattern-afterwards-flink" + migrateVersion + "-snapshot"),
-				migrateVersion);
+			harness.initializeState(
+				OperatorSnapshotUtil.getResourceFilename(
+					"cep-migration-single-pattern-afterwards-flink" + migrateVersion + "-snapshot"));
 
 			harness.open();
 
@@ -579,10 +574,9 @@ public class CEPMigrationTest {
 		try {
 			harness.setup();
 
-			MigrationTestUtil.restoreFromSnapshot(
-				harness,
-				OperatorSnapshotUtil.getResourceFilename("cep-migration-conditions-flink" + migrateVersion + "-snapshot"),
-				migrateVersion);
+			harness.initializeState(
+				OperatorSnapshotUtil.getResourceFilename(
+					"cep-migration-conditions-flink" + migrateVersion + "-snapshot"));
 
 			harness.open();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index f4883c9..d2ef199 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -53,7 +53,6 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.migration.MigrationTestUtil;
 import org.apache.flink.streaming.util.migration.MigrationVersion;
 import org.apache.flink.util.Collector;
 
@@ -184,11 +183,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-session-with-stateful-trigger-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-session-with-stateful-trigger-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -278,11 +275,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-session-with-stateful-trigger-mint-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-session-with-stateful-trigger-mint-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -402,11 +397,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-reduce-event-time-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-reduce-event-time-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -513,11 +506,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-apply-event-time-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-apply-event-time-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -620,11 +611,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-reduce-processing-time-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-reduce-processing-time-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -719,11 +708,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-apply-processing-time-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-apply-processing-time-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
@@ -837,11 +824,9 @@ public class WindowOperatorMigrationTest {
 
 		testHarness.setup();
 
-		MigrationTestUtil.restoreFromSnapshot(
-			testHarness,
+		testHarness.initializeState(
 			OperatorSnapshotUtil.getResourceFilename(
-				"win-op-migration-test-kryo-serialized-key-flink" + testMigrateVersion + "-snapshot"),
-			testMigrateVersion);
+				"win-op-migration-test-kryo-serialized-key-flink" + testMigrateVersion + "-snapshot"));
 
 		testHarness.open();
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 0e120eb..3af630a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -340,6 +340,14 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 		initializeState(operatorStateHandles, null);
 	}
 
+	public void initializeState(String operatorStateSnapshotPath) throws Exception {
+		initializeState(OperatorSnapshotUtil.readStateHandle(operatorStateSnapshotPath));
+	}
+
+	public void initializeEmptyState() throws Exception {
+		initializeState((OperatorSubtaskState) null);
+	}
+
 	/**
 	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState()}.
 	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)}
@@ -485,7 +493,7 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable {
 	 */
 	public void open() throws Exception {
 		if (!initializeCalled) {
-			initializeState(null);
+			initializeEmptyState();
 		}
 		operator.open();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
deleted file mode 100644
index 1c95a04..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.migration;
-
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-
-/**
- * Utility methods for testing snapshot migrations.
- */
-public class MigrationTestUtil {
-
-	/**
-	 * Restore from a snapshot taken with an older Flink version.
-	 *
-	 * @param testHarness          the test harness to restore the snapshot to.
-	 * @param snapshotPath         the absolute path to the snapshot.
-	 * @param snapshotFlinkVersion the Flink version of the snapshot.
-	 * @throws Exception
-	 */
-	public static void restoreFromSnapshot(
-		AbstractStreamOperatorTestHarness<?> testHarness,
-		String snapshotPath,
-		MigrationVersion snapshotFlinkVersion) throws Exception {
-
-		testHarness.initializeState(OperatorSnapshotUtil.readStateHandle(snapshotPath));
-	}
-}