You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/09/08 18:10:31 UTC

[flink] branch master updated (390926e -> 597f502)

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 390926e  [FLINK-17637][connector] fix the unstable HadoopS3RecoverableWriterITCase#testCleanupRecoverableState
     new be2cc99  [FLINK-14942][state-processor-api] Support Savepoint deep copy
     new 597f502  [FLINK-14942][docs] remove the "shallow copy" note in "Modifying savepoints" section, fix example

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/libs/state_processor_api.md               |   2 -
 docs/dev/libs/state_processor_api.zh.md            |   2 -
 .../apache/flink/state/api/WritableSavepoint.java  |  35 ++--
 .../flink/state/api/output/FileCopyFunction.java   |  74 ++++++++
 .../flink/state/api/output/StatePathExtractor.java | 104 +++++++++++
 .../flink/state/api/SavepointDeepCopyTest.java     | 197 +++++++++++++++++++++
 .../flink/state/api/SavepointWriterITCase.java     |  49 +++--
 7 files changed, 413 insertions(+), 50 deletions(-)
 create mode 100644 flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
 create mode 100644 flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
 create mode 100644 flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java


[flink] 02/02: [FLINK-14942][docs] remove the "shallow copy" note in "Modifying savepoints" section, fix example

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 597f5027c5b0277a80448f988c11f314449d270f
Author: Jun Qin <11...@users.noreply.github.com>
AuthorDate: Wed Sep 2 14:28:34 2020 +0200

    [FLINK-14942][docs] remove the "shallow copy" note in "Modifying savepoints" section, fix example
    
    This closes #13309
---
 docs/dev/libs/state_processor_api.md    | 2 --
 docs/dev/libs/state_processor_api.zh.md | 2 --
 2 files changed, 4 deletions(-)

diff --git a/docs/dev/libs/state_processor_api.md b/docs/dev/libs/state_processor_api.md
index 0443883..9253317 100644
--- a/docs/dev/libs/state_processor_api.md
+++ b/docs/dev/libs/state_processor_api.md
@@ -598,5 +598,3 @@ Savepoint
 {% endhighlight %}
 </div>
 </div>
-
-{% panel **Note:** When basing a new savepoint on existing state, the state processor api makes a shallow copy of the pointers to the existing operators. This means that both savepoints share state and one cannot be deleted without corrupting the other! %}
diff --git a/docs/dev/libs/state_processor_api.zh.md b/docs/dev/libs/state_processor_api.zh.md
index bb23a5a..ca04a73 100644
--- a/docs/dev/libs/state_processor_api.zh.md
+++ b/docs/dev/libs/state_processor_api.zh.md
@@ -612,5 +612,3 @@ Savepoint
 {% endhighlight %}
 </div>
 </div>
-
-{% panel **Note:** When basing a new savepoint on existing state, the state processor api makes a shallow copy of the pointers to the existing operators. This means that both savepoints share state and one cannot be deleted without corrupting the other! %}


[flink] 01/02: [FLINK-14942][state-processor-api] Support Savepoint deep copy

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit be2cc99e6b9fc734a8e920f66ebb035ab86bbe7e
Author: Jun Qin <11...@users.noreply.github.com>
AuthorDate: Sun Aug 30 16:46:47 2020 +0200

    [FLINK-14942][state-processor-api] Support Savepoint deep copy
---
 .../apache/flink/state/api/WritableSavepoint.java  |  35 ++--
 .../flink/state/api/output/FileCopyFunction.java   |  74 ++++++++
 .../flink/state/api/output/StatePathExtractor.java | 104 +++++++++++
 .../flink/state/api/SavepointDeepCopyTest.java     | 197 +++++++++++++++++++++
 .../flink/state/api/SavepointWriterITCase.java     |  49 +++--
 5 files changed, 413 insertions(+), 46 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
index cd24e63..ce2833f 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java
@@ -22,8 +22,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.state.api.output.FileCopyFunction;
 import org.apache.flink.state.api.output.MergeOperatorStates;
 import org.apache.flink.state.api.output.SavepointOutputFormat;
+import org.apache.flink.state.api.output.StatePathExtractor;
 import org.apache.flink.state.api.runtime.BootstrapTransformationWithID;
 import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
 import org.apache.flink.util.Preconditions;
@@ -90,38 +92,37 @@ public abstract class WritableSavepoint<F extends WritableSavepoint> {
 
 		List<OperatorState> existingOperators = metadata.getExistingOperators();
 
-		DataSet<OperatorState> finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators);
-
-		finalOperatorStates
-			.reduceGroup(new MergeOperatorStates(metadata.getMasterStates()))
-			.name("reduce(OperatorState)")
-			.output(new SavepointOutputFormat(savepointPath))
-			.name(path);
-	}
-
-	private DataSet<OperatorState> unionOperatorStates(DataSet<OperatorState> newOperatorStates, List<OperatorState> existingOperators) {
 		DataSet<OperatorState> finalOperatorStates;
 		if (existingOperators.isEmpty()) {
 			finalOperatorStates = newOperatorStates;
 		} else {
-			DataSet<OperatorState> wrappedCollection = newOperatorStates
-				.getExecutionEnvironment()
+			DataSet<OperatorState> existingOperatorStates = newOperatorStates.getExecutionEnvironment()
 				.fromCollection(existingOperators);
 
-			finalOperatorStates = newOperatorStates.union(wrappedCollection);
+			existingOperatorStates
+				.flatMap(new StatePathExtractor())
+				.setParallelism(1)
+				.output(new FileCopyFunction(path));
+
+			finalOperatorStates = newOperatorStates.union(existingOperatorStates);
+
 		}
-		return finalOperatorStates;
+		finalOperatorStates
+			.reduceGroup(new MergeOperatorStates(metadata.getMasterStates()))
+			.name("reduce(OperatorState)")
+			.output(new SavepointOutputFormat(savepointPath))
+			.name(path);
 	}
 
 	private DataSet<OperatorState> writeOperatorStates(
-			List<BootstrapTransformationWithID<?>> newOperatorStates,
-			Path savepointWritePath) {
+		List<BootstrapTransformationWithID<?>> newOperatorStates,
+		Path savepointWritePath) {
 		return newOperatorStates
 			.stream()
 			.map(newOperatorState -> newOperatorState
 				.getBootstrapTransformation()
 				.writeOperatorState(newOperatorState.getOperatorID(), stateBackend, metadata.getMaxParallelism(), savepointWritePath))
 			.reduce(DataSet::union)
-			.orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
+			.orElseThrow(() -> new IllegalStateException("Savepoint must contain at least one operator"));
 	}
 }
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
new file mode 100644
index 0000000..0ed1ed6
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * This output format copies files from an existing savepoint into a new directory.
+ */
+@Internal
+public final class FileCopyFunction implements OutputFormat<String> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final String path;
+
+	/**
+	 * @param path the destination path to copy file
+	 */
+	public FileCopyFunction(String path) {
+		this.path = Preconditions.checkNotNull(path, "The destination path cannot be null");
+	}
+
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		// Create the destination parent directory before copy.
+		// It is not a problem if it exists already.
+		Path destParent = new Path(path);
+		destParent.getFileSystem().mkdirs(destParent);
+	}
+
+	@Override
+	public void writeRecord(String record) throws IOException {
+		Path sourcePath = new Path(record);
+		Path destPath = new Path(path, sourcePath.getName());
+		try (
+			FSDataOutputStream os = destPath.getFileSystem().create(destPath, FileSystem.WriteMode.NO_OVERWRITE);
+			FSDataInputStream is = sourcePath.getFileSystem().open(sourcePath)) {
+			IOUtils.copyBytes(is, os);
+		}
+	}
+
+	@Override
+	public void configure(Configuration parameters) { }
+
+	@Override
+	public void close() throws IOException { }
+}
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
new file mode 100644
index 0000000..53832ef
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api.output;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
+
+/**
+ * Extracts all file paths that are part of the provided {@link OperatorState}.
+ */
+@Internal
+public class StatePathExtractor implements FlatMapFunction<OperatorState, String> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void flatMap(OperatorState operatorState, Collector<String> out) throws Exception {
+		for (OperatorSubtaskState subTaskState : operatorState.getSubtaskStates().values()) {
+			// managed operator state
+			for (OperatorStateHandle operatorStateHandle: subTaskState.getManagedOperatorState()) {
+				Path path = getStateFilePathFromStreamStateHandle(operatorStateHandle);
+				if (path != null) {
+					out.collect(path.getPath());
+				}
+			}
+			// managed keyed state
+			for (KeyedStateHandle keyedStateHandle: subTaskState.getManagedKeyedState()) {
+				if (keyedStateHandle instanceof KeyGroupsStateHandle) {
+					Path path = getStateFilePathFromStreamStateHandle((KeyGroupsStateHandle) keyedStateHandle);
+					if (path != null) {
+						out.collect(path.getPath());
+					}
+				}
+			}
+			// raw operator state
+			for (OperatorStateHandle operatorStateHandle: subTaskState.getRawOperatorState()) {
+				Path path = getStateFilePathFromStreamStateHandle(operatorStateHandle);
+				if (path != null) {
+					out.collect(path.getPath());
+				}
+			}
+			// raw keyed state
+			for (KeyedStateHandle keyedStateHandle: subTaskState.getRawKeyedState()) {
+				if (keyedStateHandle instanceof KeyGroupsStateHandle) {
+					Path path = getStateFilePathFromStreamStateHandle((KeyGroupsStateHandle) keyedStateHandle);
+					if (path != null) {
+						out.collect(path.getPath());
+					}
+				}
+			}
+		}
+	}
+
+
+	/**
+	 * This method recursively looks for the contained {@link FileStateHandle}s in a given {@link StreamStateHandle}.
+	 *
+	 * @param handle the {@code StreamStateHandle} to check for a contained {@code FileStateHandle}
+	 * @return the file path if the given {@code StreamStateHandle} contains a {@code FileStateHandle} object, null
+	 * otherwise
+	 */
+	private @Nullable Path getStateFilePathFromStreamStateHandle(StreamStateHandle handle) {
+		if (handle instanceof FileStateHandle) {
+			return ((FileStateHandle) handle).getFilePath();
+		} else if (handle instanceof OperatorStateHandle) {
+			return getStateFilePathFromStreamStateHandle(
+				((OperatorStateHandle) handle).getDelegateStateHandle());
+		} else if (handle instanceof KeyedStateHandle) {
+			if (handle instanceof KeyGroupsStateHandle) {
+				return getStateFilePathFromStreamStateHandle(
+					((KeyGroupsStateHandle) handle).getDelegateStateHandle());
+			}
+			// other KeyedStateHandles either do not contains FileStateHandle, or are not part of a savepoint
+		}
+		return null;
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
new file mode 100644
index 0000000..cbd31e8
--- /dev/null
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointDeepCopyTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.api;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
+import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.isIn;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test the savepoint deep copy.
+ */
+@RunWith(value = Parameterized.class)
+public class SavepointDeepCopyTest extends AbstractTestBase {
+
+	private static final String TEXT = "The quick brown fox jumps over the lazy dog";
+	private static final String RANDOM_VALUE = RandomStringUtils.randomAlphanumeric(120);
+	private static final int FILE_STATE_SIZE_THRESHOLD = 1;
+
+	private final StateBackend backend;
+
+	public SavepointDeepCopyTest(StateBackend backend) throws Exception {
+		this.backend = backend;
+	}
+
+	@Parameterized.Parameters(name = "State Backend: {0}")
+	public static Collection<StateBackend> data() {
+		return Arrays.asList(
+			new FsStateBackend(new Path("file:///tmp").toUri(), FILE_STATE_SIZE_THRESHOLD),
+			new RocksDBStateBackend(new FsStateBackend(new Path("file:///tmp").toUri(), FILE_STATE_SIZE_THRESHOLD)));
+	}
+
+	/**
+	 * To bootstrapper a savepoint for testing.
+	 */
+	static class WordMapBootstrapper extends KeyedStateBootstrapFunction<String, String> {
+		private ValueState<Tuple2<String, String>> state;
+
+		@Override
+		public void open(Configuration parameters) {
+			ValueStateDescriptor<Tuple2<String, String>> descriptor = new ValueStateDescriptor<>(
+				"state", Types.TUPLE(Types.STRING, Types.STRING));
+			state = getRuntimeContext().getState(descriptor);
+		}
+
+		@Override
+		public void processElement(String value, Context ctx) throws Exception {
+			if (state.value() == null) {
+				state.update(new Tuple2<>(value, RANDOM_VALUE));
+			}
+		}
+	}
+
+	/**
+	 * To read the state back from the newly created savepoint.
+	 */
+	static class ReadFunction extends KeyedStateReaderFunction<String, Tuple2<String, String>> {
+
+		private ValueState<Tuple2<String, String>> state;
+
+		@Override
+		public void open(Configuration parameters) {
+			ValueStateDescriptor<Tuple2<String, String>> stateDescriptor = new ValueStateDescriptor<>(
+				"state", Types.TUPLE(Types.STRING, Types.STRING));
+			state = getRuntimeContext().getState(stateDescriptor);
+		}
+
+		@Override
+		public void readKey(
+			String key,
+			Context ctx,
+			Collector<Tuple2<String, String>> out) throws Exception {
+			out.collect(state.value());
+		}
+	}
+
+	/**
+	 * Test savepoint deep copy. This method tests the savepoint deep copy by:
+	 * <ul>
+	 * <li>create {@code savepoint1} with operator {@code Operator1}, make sure it has more state files in addition to
+	 * _metadata
+	 * <li>create {@code savepoint2} from {@code savepoint1} by adding a new operator {@code Operator2}
+	 * <li>check all state files in {@code savepoint1}'s directory are copied over to {@code savepoint2}'s directory
+	 * <li>read the state of {@code Operator1} from {@code savepoint2} and make sure the number of the keys remain same
+	 * </ul>
+	 * @throws Exception throw exceptions when anything goes wrong
+	 */
+	@Test
+	public void testSavepointDeepCopy() throws Exception {
+
+        // set up the execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// construct DataSet
+		DataSet<String> words = env.fromElements(TEXT.split(" "));
+
+		// create BootstrapTransformation
+		BootstrapTransformation<String> transformation = OperatorTransformation
+			.bootstrapWith(words)
+			.keyBy(e -> e)
+			.transform(new WordMapBootstrapper());
+
+		File savepointUrl1 = createAndRegisterTempFile(new AbstractID().toHexString());
+		String savepointPath1 = savepointUrl1.getPath();
+
+		// create a savepoint with BootstrapTransformations (one per operator)
+		// write the created savepoint to a given path
+		Savepoint.create(backend, 128)
+			.withOperator("Operator1", transformation)
+			.write(savepointPath1);
+
+		env.execute("bootstrap savepoint1");
+
+		Assert.assertTrue(
+			"Failed to bootstrap savepoint1 with additional state files",
+			Files.list(Paths.get(savepointPath1)).count() > 1);
+
+		Set<String> stateFiles1 = Files.list(Paths.get(savepointPath1))
+			.map(path -> path.getFileName().toString())
+			.collect(Collectors.toSet());
+
+		// create savepoint2 from savepoint1 created above
+		File savepointUrl2 = createAndRegisterTempFile(new AbstractID().toHexString());
+		String savepointPath2 = savepointUrl2.getPath();
+
+		ExistingSavepoint savepoint2 = Savepoint.load(env, savepointPath1, backend);
+		savepoint2
+			.withOperator("Operator2", transformation)
+			.write(savepointPath2);
+		env.execute("create savepoint2");
+
+		Assert.assertTrue("Failed to create savepoint2 from savepoint1 with additional state files",
+			Files.list(Paths.get(savepointPath2)).count() > 1);
+
+		Set<String> stateFiles2 = Files.list(Paths.get(savepointPath2))
+			.map(path -> path.getFileName().toString())
+			.collect(Collectors.toSet());
+
+		assertThat("At least one state file in savepoint1 are not in savepoint2",
+			stateFiles1, everyItem(isIn(stateFiles2)));
+
+		// Try to load savepoint2 and read the state of "Operator1" (which has not been touched/changed when savepoint2
+		// was created) and make sure the number of keys remain same
+		long actuallyKeyNum = Savepoint.load(env, savepointPath2, backend)
+			.readKeyedState("Operator1", new ReadFunction()).count();
+		long expectedKeyNum = Arrays.stream(TEXT.split(" ")).distinct().count();
+		Assert.assertEquals(
+			"Unexpected number of keys in the state of Operator1",
+			expectedKeyNum, actuallyKeyNum);
+	}
+}
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
index 5dd3fcc..e5eb824 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
 import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
 import org.apache.flink.state.api.functions.StateBootstrapFunction;
@@ -53,8 +53,6 @@ import org.apache.flink.util.SerializedThrowable;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -69,8 +67,9 @@ import java.util.concurrent.ConcurrentSkipListSet;
 /**
  * IT test for writing savepoints.
  */
-@RunWith(value = Parameterized.class)
 public class SavepointWriterITCase extends AbstractTestBase {
+	private static final int FILE_STATE_SIZE = 1;
+
 	private static final String ACCOUNT_UID = "accounts";
 
 	private static final String CURRENCY_UID = "currency";
@@ -80,8 +79,6 @@ public class SavepointWriterITCase extends AbstractTestBase {
 	private static final MapStateDescriptor<String, Double> descriptor = new MapStateDescriptor<>(
 		"currency-rate", Types.STRING, Types.DOUBLE);
 
-	private final StateBackend backend;
-
 	private static final Collection<Account> accounts = Arrays.asList(
 		new Account(1, 100.0),
 		new Account(2, 100.0),
@@ -89,40 +86,34 @@ public class SavepointWriterITCase extends AbstractTestBase {
 
 	private static final Collection<CurrencyRate> currencyRates = Arrays.asList(
 		new CurrencyRate("USD", 1.0),
-		new CurrencyRate("EUR", 1.3)
-	);
-
-	public SavepointWriterITCase(StateBackend backend) throws Exception {
-		this.backend = backend;
+		new CurrencyRate("EUR", 1.3));
 
-		//reset the cluster so we can change the state backend
-		miniClusterResource.after();
-		miniClusterResource.before();
+	@Test
+	public void testFsStateBackend() throws Exception {
+		testStateBootstrapAndModification(new FsStateBackend(TEMPORARY_FOLDER.newFolder().toURI(), FILE_STATE_SIZE));
 	}
 
-	@Parameterized.Parameters(name = "Savepoint Writer: {0}")
-	public static Collection<StateBackend> data() {
-		return Arrays.asList(
-			new MemoryStateBackend(),
-			new RocksDBStateBackend((StateBackend) new MemoryStateBackend()));
+	@Test
+	public void testRocksDBStateBackend() throws Exception {
+		StateBackend backend = new RocksDBStateBackend(new FsStateBackend(TEMPORARY_FOLDER.newFolder().toURI(), FILE_STATE_SIZE));
+		testStateBootstrapAndModification(backend);
 	}
 
-	@Test
-	public void testStateBootstrapAndModification() throws Exception {
+	public void testStateBootstrapAndModification(StateBackend backend) throws Exception {
 		final String savepointPath = getTempDirPath(new AbstractID().toHexString());
 
-		bootstrapState(savepointPath);
+		bootstrapState(backend, savepointPath);
 
-		validateBootstrap(savepointPath);
+		validateBootstrap(backend, savepointPath);
 
 		final String modifyPath = getTempDirPath(new AbstractID().toHexString());
 
-		modifySavepoint(savepointPath, modifyPath);
+		modifySavepoint(backend, savepointPath, modifyPath);
 
-		validateModification(modifyPath);
+		validateModification(backend, modifyPath);
 	}
 
-	private void bootstrapState(String savepointPath) throws Exception {
+	private void bootstrapState(StateBackend backend, String savepointPath) throws Exception {
 		ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Account> accountDataSet = bEnv.fromCollection(accounts);
@@ -147,7 +138,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
 		bEnv.execute("Bootstrap");
 	}
 
-	private void validateBootstrap(String savepointPath) throws Exception {
+	private void validateBootstrap(StateBackend backend, String savepointPath) throws Exception {
 		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 		sEnv.setStateBackend(backend);
 
@@ -180,7 +171,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
 		Assert.assertEquals("Unexpected output", 3, CollectSink.accountList.size());
 	}
 
-	private void modifySavepoint(String savepointPath, String modifyPath) throws Exception {
+	private void modifySavepoint(StateBackend backend, String savepointPath, String modifyPath) throws Exception {
 		ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Integer> data = bEnv.fromElements(1, 2, 3);
@@ -198,7 +189,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
 		bEnv.execute("Modifying");
 	}
 
-	private void validateModification(String savepointPath) throws Exception {
+	private void validateModification(StateBackend backend, String savepointPath) throws Exception {
 		StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
 		sEnv.setStateBackend(backend);