You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/08/21 12:31:36 UTC

[flink] 01/02: [FLINK-10042][state] (part 1) Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aba02eb3fcc4472c3d5f5a0f527960d79c659c31
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Tue Aug 7 15:57:27 2018 +0200

    [FLINK-10042][state] (part 1) Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into full classes
---
 .../flink/runtime/state/SnapshotStrategy.java      |    3 +-
 .../runtime/state/heap/HeapKeyedStateBackend.java  |    5 +
 .../streaming/state/RocksDBKeyedStateBackend.java  | 1071 ++------------------
 .../state/snapshot/RocksFullSnapshotStrategy.java  |  478 +++++++++
 .../snapshot/RocksIncrementalSnapshotStrategy.java |  578 +++++++++++
 .../state/snapshot/RocksSnapshotUtil.java          |   51 +
 .../state/snapshot/SnapshotStrategyBase.java       |   90 ++
 .../streaming/state/RocksDBAsyncSnapshotTest.java  |   27 +-
 .../streaming/state/RocksDBStateBackendTest.java   |    1 +
 9 files changed, 1317 insertions(+), 987 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
index 9139fa7..3ad68af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
@@ -28,8 +28,7 @@ import java.util.concurrent.RunnableFuture;
  *
  * @param <S> type of the returned state object that represents the result of the snapshot operation.
  */
-@FunctionalInterface
-public interface SnapshotStrategy<S extends StateObject> {
+public interface SnapshotStrategy<S extends StateObject> extends CheckpointListener {
 
 	/**
 	 * Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index bc1e0f5..0e2f16c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -882,6 +882,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				}
 			}
 		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			// nothing to do.
+		}
 	}
 
 	private interface StateFactory {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index c159976..87c7e55 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -35,9 +35,8 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
-import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
-import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
-import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
+import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
@@ -47,32 +46,22 @@ import org.apache.flink.core.memory.ByteArrayDataInputView;
 import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
-import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
-import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.DirectoryStateHandle;
-import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.PriorityComparator;
 import org.apache.flink.runtime.state.PriorityQueueSetFactory;
@@ -80,14 +69,11 @@ import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
-import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.StateSnapshotTransformer.StateSnapshotTransformFactory;
-import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
@@ -96,25 +82,19 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ResourceGuard;
 import org.apache.flink.util.StateMigrationException;
-import org.apache.flink.util.function.SupplierWithException;
 
-import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Snapshot;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -125,7 +105,6 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
@@ -144,12 +123,16 @@ import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.clearMetaDataFollowsFlag;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag;
+
 /**
  * An {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and serializes state to
  * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
@@ -167,9 +150,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */
 	public static final String MERGE_OPERATOR_NAME = "stringappendtest";
 
-	/** File suffix of sstable files. */
-	private static final String SST_FILE_SUFFIX = ".sst";
-
 	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
 		Stream.of(
 			Tuple2.of(ValueStateDescriptor.class, (StateFactory) RocksDBValueState::create),
@@ -230,7 +210,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * Information about the k/v states as we create them. This is used to retrieve the
 	 * column family that is used for a state and also for sanity checks when restoring.
 	 */
-	private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
+	private final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
 
 	/**
 	 * Map of state names to their corresponding restored state meta info.
@@ -246,20 +226,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** True if incremental checkpointing is enabled. */
 	private final boolean enableIncrementalCheckpointing;
 
-	/** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
-	private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
-
-	/** The identifier of the last completed checkpoint. */
-	private long lastCompletedCheckpointId = -1L;
-
-	/** Unique ID of this backend. */
-	private UUID backendUID;
-
 	/** The configuration of local recovery. */
 	private final LocalRecoveryConfig localRecoveryConfig;
 
 	/** The snapshot strategy, e.g., if we use full or incremental checkpoints, local state, and so on. */
-	private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
+	private SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
 
 	/** Factory for priority queue state. */
 	private final PriorityQueueSetFactory priorityQueueFactory;
@@ -314,12 +285,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(getNumberOfKeyGroups());
 		this.kvStateInformation = new LinkedHashMap<>();
 		this.restoredKvStateMetaInfos = new HashMap<>();
-		this.materializedSstFiles = new TreeMap<>();
-		this.backendUID = UUID.randomUUID();
-
-		this.snapshotStrategy = enableIncrementalCheckpointing ?
-			new IncrementalSnapshotStrategy() :
-			new FullSnapshotStrategy();
 
 		this.writeOptions = new WriteOptions().setDisableWAL(true);
 
@@ -333,8 +298,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			default:
 				throw new IllegalArgumentException("Unknown priority queue state type: " + priorityQueueStateType);
 		}
-
-		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
 	}
 
 	private static void checkAndCreateDirectory(File directory) throws IOException {
@@ -508,41 +471,83 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		restoredKvStateMetaInfos.clear();
 
 		try {
+			RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation = null;
 			if (restoreState == null || restoreState.isEmpty()) {
 				createDB();
 			} else {
 				KeyedStateHandle firstStateHandle = restoreState.iterator().next();
 				if (firstStateHandle instanceof IncrementalKeyedStateHandle
 					|| firstStateHandle instanceof IncrementalLocalKeyedStateHandle) {
-					RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
-					restoreOperation.restore(restoreState);
+					incrementalRestoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+					incrementalRestoreOperation.restore(restoreState);
 				} else {
-					RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
-					restoreOperation.doRestore(restoreState);
+					RocksDBFullRestoreOperation<K> fullRestoreOperation = new RocksDBFullRestoreOperation<>(this);
+					fullRestoreOperation.doRestore(restoreState);
 				}
 			}
+
+			initializeSnapshotStrategy(incrementalRestoreOperation);
 		} catch (Exception ex) {
 			dispose();
 			throw ex;
 		}
 	}
 
-	@Override
-	public void notifyCheckpointComplete(long completedCheckpointId) {
-
-		if (!enableIncrementalCheckpointing) {
-			return;
-		}
-
-		synchronized (materializedSstFiles) {
-
-			if (completedCheckpointId < lastCompletedCheckpointId) {
-				return;
+	@VisibleForTesting
+	void initializeSnapshotStrategy(
+		@Nullable RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation) {
+
+		final RocksFullSnapshotStrategy<K> fullSnapshotStrategy =
+			new RocksFullSnapshotStrategy<>(
+				db,
+				rocksDBResourceGuard,
+				keySerializer,
+				kvStateInformation,
+				keyGroupRange,
+				keyGroupPrefixBytes,
+				localRecoveryConfig,
+				cancelStreamRegistry,
+				keyGroupCompressionDecorator);
+
+		if (enableIncrementalCheckpointing) {
+			final UUID backendUID;
+			final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
+			final long lastCompletedCheckpointId;
+
+			if (incrementalRestoreOperation == null) {
+				backendUID = UUID.randomUUID();
+				materializedSstFiles = new TreeMap<>();
+				lastCompletedCheckpointId = -1L;
+			} else {
+				backendUID = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID());
+				materializedSstFiles = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredSstFiles());
+				lastCompletedCheckpointId = incrementalRestoreOperation.getLastCompletedCheckpointId();
+				Preconditions.checkState(lastCompletedCheckpointId >= 0L);
 			}
+			// TODO eventually we might want to separate savepoint and snapshot strategy, i.e. having 2 strategies.
+			this.snapshotStrategy = new RocksIncrementalSnapshotStrategy<>(
+				db,
+				rocksDBResourceGuard,
+				keySerializer,
+				kvStateInformation,
+				keyGroupRange,
+				keyGroupPrefixBytes,
+				localRecoveryConfig,
+				cancelStreamRegistry,
+				instanceBasePath,
+				backendUID,
+				materializedSstFiles,
+				lastCompletedCheckpointId,
+				fullSnapshotStrategy);
+		} else {
+			this.snapshotStrategy = fullSnapshotStrategy;
+		}
+	}
 
-			materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
-
-			lastCompletedCheckpointId = completedCheckpointId;
+	@Override
+	public void notifyCheckpointComplete(long completedCheckpointId) throws Exception {
+		if (snapshotStrategy != null) {
+			snapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
 		}
 	}
 
@@ -656,10 +661,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		/**
 		 * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle.
-		 *
-		 * @throws IOException
-		 * @throws ClassNotFoundException
-		 * @throws RocksDBException
 		 */
 		private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
 
@@ -724,9 +725,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		/**
 		 * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
-		 *
-		 * @throws IOException
-		 * @throws RocksDBException
 		 */
 		private void restoreKVStateData() throws IOException, RocksDBException {
 			//for all key-groups in the current state handle...
@@ -752,14 +750,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 							while (keyGroupHasMoreKeys) {
 								byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
 								byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-								if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+								if (hasMetaDataFollowsFlag(key)) {
 									//clear the signal bit in the key to make it ready for insertion again
-									RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+									clearMetaDataFollowsFlag(key);
 									writeBatchWrapper.put(handle, key, value);
 									//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-									kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+									kvStateId = END_OF_KEY_GROUP_MARK
 										& compressedKgInputView.readShort();
-									if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+									if (END_OF_KEY_GROUP_MARK == kvStateId) {
 										keyGroupHasMoreKeys = false;
 									} else {
 										handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
@@ -781,9 +779,26 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private static class RocksDBIncrementalRestoreOperation<T> {
 
 		private final RocksDBKeyedStateBackend<T> stateBackend;
+		private final SortedMap<Long, Set<StateHandleID>> restoredSstFiles;
+		private UUID restoredBackendUID;
+		private long lastCompletedCheckpointId;
 
 		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
+
 			this.stateBackend = stateBackend;
+			this.restoredSstFiles = new TreeMap<>();
+		}
+
+		SortedMap<Long, Set<StateHandleID>> getRestoredSstFiles() {
+			return restoredSstFiles;
+		}
+
+		UUID getRestoredBackendUID() {
+			return restoredBackendUID;
+		}
+
+		long getLastCompletedCheckpointId() {
+			return lastCompletedCheckpointId;
 		}
 
 		/**
@@ -872,6 +887,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 */
 		void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
 
+			this.restoredBackendUID = UUID.randomUUID();
+
 			initTargetDB(restoreStateHandles, stateBackend.keyGroupRange);
 
 			byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
@@ -949,6 +966,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			@Nonnull
 			private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
 
+			private
+
 			RestoredDBInstance(
 				@Nonnull RocksDB db,
 				@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@@ -1113,10 +1132,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			List<ColumnFamilyDescriptor> columnFamilyDescriptors,
 			List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) throws Exception {
 			// pick up again the old backend id, so the we can reference existing state
-			stateBackend.backendUID = restoreStateHandle.getBackendIdentifier();
+			this.restoredBackendUID = restoreStateHandle.getBackendIdentifier();
 
 			LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.",
-				stateBackend.operatorIdentifier, stateBackend.backendUID);
+				stateBackend.operatorIdentifier, this.restoredBackendUID);
 
 			// create hard links in the instance directory
 			if (!stateBackend.instanceRocksDBPath.mkdirs()) {
@@ -1150,13 +1169,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 
 			// use the restore sst files as the base for succeeding checkpoints
-			synchronized (stateBackend.materializedSstFiles) {
-				stateBackend.materializedSstFiles.put(
+				restoredSstFiles.put(
 					restoreStateHandle.getCheckpointId(),
 					restoreStateHandle.getSharedStateHandleIDs());
-			}
 
-			stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
+			lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
 		}
 
 		/**
@@ -1447,881 +1464,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return count;
 	}
 
-	private class FullSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
-
-		@Override
-		public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
-			long checkpointId,
-			long timestamp,
-			CheckpointStreamFactory primaryStreamFactory,
-			CheckpointOptions checkpointOptions) throws Exception {
-
-			long startTime = System.currentTimeMillis();
-
-			if (kvStateInformation.isEmpty()) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
-						timestamp);
-				}
-
-				return DoneFuture.of(SnapshotResult.empty());
-			}
-
-			final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier =
-
-				localRecoveryConfig.isLocalRecoveryEnabled() &&
-					(CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ?
-
-					() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
-						checkpointId,
-						CheckpointedStateScope.EXCLUSIVE,
-						primaryStreamFactory,
-						localRecoveryConfig.getLocalStateDirectoryProvider()) :
-
-					() -> CheckpointStreamWithResultProvider.createSimpleStream(
-						CheckpointedStateScope.EXCLUSIVE,
-						primaryStreamFactory);
-
-			final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
-
-			final RocksDBFullSnapshotOperation<K> snapshotOperation =
-				new RocksDBFullSnapshotOperation<>(
-					RocksDBKeyedStateBackend.this,
-					supplier,
-					snapshotCloseableRegistry);
-
-			snapshotOperation.takeDBSnapShot();
-
-			// implementation of the async IO operation, based on FutureTask
-			AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable =
-				new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() {
-
-					@Override
-					protected void acquireResources() throws Exception {
-						cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
-						snapshotOperation.openCheckpointStream();
-					}
-
-					@Override
-					protected void releaseResources() throws Exception {
-						closeLocalRegistry();
-						releaseSnapshotOperationResources();
-					}
-
-					private void releaseSnapshotOperationResources() {
-						// hold the db lock while operation on the db to guard us against async db disposal
-						snapshotOperation.releaseSnapshotResources();
-					}
-
-					@Override
-					protected void stopOperation() throws Exception {
-						closeLocalRegistry();
-					}
-
-					private void closeLocalRegistry() {
-						if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
-							try {
-								snapshotCloseableRegistry.close();
-							} catch (Exception ex) {
-								LOG.warn("Error closing local registry", ex);
-							}
-						}
-					}
-
-					@Nonnull
-					@Override
-					public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
-						long startTime = System.currentTimeMillis();
-
-						if (isStopped()) {
-							throw new IOException("RocksDB closed.");
-						}
-
-						snapshotOperation.writeDBSnapshot();
-
-						LOG.debug("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
-							primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
-
-						return snapshotOperation.getSnapshotResultStateHandle();
-					}
-				};
-
-			LOG.debug("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.",
-				primaryStreamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
-			return AsyncStoppableTaskWithCallback.from(ioCallable);
-		}
-	}
-
-	private class IncrementalSnapshotStrategy implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
-
-		private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate;
-
-		public IncrementalSnapshotStrategy() {
-			this.savepointDelegate = new FullSnapshotStrategy();
-		}
-
-		@Override
-		public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
-			long checkpointId,
-			long checkpointTimestamp,
-			CheckpointStreamFactory checkpointStreamFactory,
-			CheckpointOptions checkpointOptions) throws Exception {
-
-			// for savepoints, we delegate to the full snapshot strategy because savepoints are always self-contained.
-			if (CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType()) {
-				return savepointDelegate.performSnapshot(
-					checkpointId,
-					checkpointTimestamp,
-					checkpointStreamFactory,
-					checkpointOptions);
-			}
-
-			if (db == null) {
-				throw new IOException("RocksDB closed.");
-			}
-
-			if (kvStateInformation.isEmpty()) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", checkpointTimestamp);
-				}
-				return DoneFuture.of(SnapshotResult.empty());
-			}
-
-			SnapshotDirectory snapshotDirectory;
-
-			if (localRecoveryConfig.isLocalRecoveryEnabled()) {
-				// create a "permanent" snapshot directory for local recovery.
-				LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
-				File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
-
-				if (directory.exists()) {
-					FileUtils.deleteDirectory(directory);
-				}
-
-				if (!directory.mkdirs()) {
-					throw new IOException("Local state base directory for checkpoint " + checkpointId +
-						" already exists: " + directory);
-				}
-
-				// introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints.
-				File rdbSnapshotDir = new File(directory, "rocks_db");
-				Path path = new Path(rdbSnapshotDir.toURI());
-				// create a "permanent" snapshot directory because local recovery is active.
-				snapshotDirectory = SnapshotDirectory.permanent(path);
-			} else {
-				// create a "temporary" snapshot directory because local recovery is inactive.
-				Path path = new Path(instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
-				snapshotDirectory = SnapshotDirectory.temporary(path);
-			}
-
-			final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
-				new RocksDBIncrementalSnapshotOperation<>(
-					RocksDBKeyedStateBackend.this,
-					checkpointStreamFactory,
-					snapshotDirectory,
-					checkpointId);
-
-			try {
-				snapshotOperation.takeSnapshot();
-			} catch (Exception e) {
-				snapshotOperation.stop();
-				snapshotOperation.releaseResources(true);
-				throw e;
-			}
-
-			return new FutureTask<SnapshotResult<KeyedStateHandle>>(
-				snapshotOperation::runSnapshot
-			) {
-				@Override
-				public boolean cancel(boolean mayInterruptIfRunning) {
-					snapshotOperation.stop();
-					return super.cancel(mayInterruptIfRunning);
-				}
-
-				@Override
-				protected void done() {
-					snapshotOperation.releaseResources(isCancelled());
-				}
-			};
-		}
-	}
-
-	/**
-	 * Encapsulates the process to perform a full snapshot of a RocksDBKeyedStateBackend.
-	 */
-	@VisibleForTesting
-	static class RocksDBFullSnapshotOperation<K>
-		extends AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> {
-
-		static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
-		static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
-
-		private final RocksDBKeyedStateBackend<K> stateBackend;
-		private final KeyGroupRangeOffsets keyGroupRangeOffsets;
-		private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;
-		private final CloseableRegistry snapshotCloseableRegistry;
-		private final ResourceGuard.Lease dbLease;
-
-		private Snapshot snapshot;
-		private ReadOptions readOptions;
-
-		/**
-		 * The state meta data.
-		 */
-		private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
-
-		/**
-		 * The copied column handle.
-		 */
-		private List<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> copiedMeta;
-
-		private List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators;
-
-		private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
-		private DataOutputView outputView;
-
-		RocksDBFullSnapshotOperation(
-			RocksDBKeyedStateBackend<K> stateBackend,
-			SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier,
-			CloseableRegistry registry) throws IOException {
-
-			this.stateBackend = stateBackend;
-			this.checkpointStreamSupplier = checkpointStreamSupplier;
-			this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange);
-			this.snapshotCloseableRegistry = registry;
-			this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
-		}
-
-		/**
-		 * 1) Create a snapshot object from RocksDB.
-		 *
-		 */
-		public void takeDBSnapShot() {
-			Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
-
-			this.stateMetaInfoSnapshots = new ArrayList<>(stateBackend.kvStateInformation.size());
-
-			this.copiedMeta = new ArrayList<>(stateBackend.kvStateInformation.size());
-
-			for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 :
-				stateBackend.kvStateInformation.values()) {
-				// snapshot meta info
-				this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
-				this.copiedMeta.add(tuple2);
-			}
-			this.snapshot = stateBackend.db.getSnapshot();
-		}
-
-		/**
-		 * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write.
-		 *
-		 * @throws Exception
-		 */
-		public void openCheckpointStream() throws Exception {
-			Preconditions.checkArgument(checkpointStreamWithResultProvider == null,
-				"Output stream for snapshot is already set.");
-
-			checkpointStreamWithResultProvider = checkpointStreamSupplier.get();
-			snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
-			outputView = new DataOutputViewStreamWrapper(
-				checkpointStreamWithResultProvider.getCheckpointOutputStream());
-		}
-
-		/**
-		 * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1).
-		 *
-		 * @throws IOException
-		 */
-		public void writeDBSnapshot() throws IOException, InterruptedException, RocksDBException {
-
-			if (null == snapshot) {
-				throw new IOException("No snapshot available. Might be released due to cancellation.");
-			}
-
-			Preconditions.checkNotNull(checkpointStreamWithResultProvider, "No output stream to write snapshot.");
-			writeKVStateMetaData();
-			writeKVStateData();
-		}
-
-		/**
-		 * 4) Returns a snapshot result for the completed snapshot.
-		 *
-		 * @return snapshot result for the completed snapshot.
-		 */
-		@Nonnull
-		public SnapshotResult<KeyedStateHandle> getSnapshotResultStateHandle() throws IOException {
-
-			if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) {
-
-				SnapshotResult<StreamStateHandle> res =
-					checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
-				checkpointStreamWithResultProvider = null;
-				return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(res, keyGroupRangeOffsets);
-			}
-
-			return SnapshotResult.empty();
-		}
-
-		/**
-		 * 5) Release the snapshot object for RocksDB and clean up.
-		 */
-		public void releaseSnapshotResources() {
-
-			checkpointStreamWithResultProvider = null;
-
-			if (null != kvStateIterators) {
-				for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : kvStateIterators) {
-					IOUtils.closeQuietly(kvStateIterator.f0);
-				}
-				kvStateIterators = null;
-			}
-
-			if (null != snapshot) {
-				if (null != stateBackend.db) {
-					stateBackend.db.releaseSnapshot(snapshot);
-				}
-				IOUtils.closeQuietly(snapshot);
-				snapshot = null;
-			}
-
-			if (null != readOptions) {
-				IOUtils.closeQuietly(readOptions);
-				readOptions = null;
-			}
-
-			this.dbLease.close();
-		}
-
-		private void writeKVStateMetaData() throws IOException {
-
-			this.kvStateIterators = new ArrayList<>(copiedMeta.size());
-
-			int kvStateId = 0;
-
-			//retrieve iterator for this k/v states
-			readOptions = new ReadOptions();
-			readOptions.setSnapshot(snapshot);
-
-			for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : copiedMeta) {
-				RocksIteratorWrapper rocksIteratorWrapper =
-					getRocksIterator(stateBackend.db, tuple2.f0, tuple2.f1, readOptions);
-				kvStateIterators.add(new Tuple2<>(rocksIteratorWrapper, kvStateId));
-				++kvStateId;
-			}
-
-			KeyedBackendSerializationProxy<K> serializationProxy =
-				new KeyedBackendSerializationProxy<>(
-					// TODO: this code assumes that writing a serializer is threadsafe, we should support to
-					// get a serialized form already at state registration time in the future
-					stateBackend.getKeySerializer(),
-					stateMetaInfoSnapshots,
-					!Objects.equals(
-						UncompressedStreamCompressionDecorator.INSTANCE,
-						stateBackend.keyGroupCompressionDecorator));
-
-			serializationProxy.write(outputView);
-		}
-
-		private void writeKVStateData() throws IOException, InterruptedException {
-			byte[] previousKey = null;
-			byte[] previousValue = null;
-			DataOutputView kgOutView = null;
-			OutputStream kgOutStream = null;
-			CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream =
-				checkpointStreamWithResultProvider.getCheckpointOutputStream();
-
-			try {
-				// Here we transfer ownership of RocksIterators to the RocksStatesPerKeyGroupMergeIterator
-				try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
-					kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
-
-					// handover complete, null out to prevent double close
-					kvStateIterators = null;
-
-					//preamble: setup with first key-group as our lookahead
-					if (mergeIterator.isValid()) {
-						//begin first key-group by recording the offset
-						keyGroupRangeOffsets.setKeyGroupOffset(
-							mergeIterator.keyGroup(),
-							checkpointOutputStream.getPos());
-						//write the k/v-state id as metadata
-						kgOutStream = stateBackend.keyGroupCompressionDecorator.
-							decorateWithCompression(checkpointOutputStream);
-						kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
-						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-						kgOutView.writeShort(mergeIterator.kvStateId());
-						previousKey = mergeIterator.key();
-						previousValue = mergeIterator.value();
-						mergeIterator.next();
-					}
-
-					//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
-					while (mergeIterator.isValid()) {
-
-						assert (!hasMetaDataFollowsFlag(previousKey));
-
-						//set signal in first key byte that meta data will follow in the stream after this k/v pair
-						if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
-
-							//be cooperative and check for interruption from time to time in the hot loop
-							checkInterrupted();
-
-							setMetaDataFollowsFlagInKey(previousKey);
-						}
-
-						writeKeyValuePair(previousKey, previousValue, kgOutView);
-
-						//write meta data if we have to
-						if (mergeIterator.isNewKeyGroup()) {
-							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-							kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
-							// this will just close the outer stream
-							kgOutStream.close();
-							//begin new key-group
-							keyGroupRangeOffsets.setKeyGroupOffset(
-								mergeIterator.keyGroup(),
-								checkpointOutputStream.getPos());
-							//write the kev-state
-							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-							kgOutStream = stateBackend.keyGroupCompressionDecorator.
-								decorateWithCompression(checkpointOutputStream);
-							kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
-							kgOutView.writeShort(mergeIterator.kvStateId());
-						} else if (mergeIterator.isNewKeyValueState()) {
-							//write the k/v-state
-							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-							kgOutView.writeShort(mergeIterator.kvStateId());
-						}
-
-						//request next k/v pair
-						previousKey = mergeIterator.key();
-						previousValue = mergeIterator.value();
-						mergeIterator.next();
-					}
-				}
-
-				//epilogue: write last key-group
-				if (previousKey != null) {
-					assert (!hasMetaDataFollowsFlag(previousKey));
-					setMetaDataFollowsFlagInKey(previousKey);
-					writeKeyValuePair(previousKey, previousValue, kgOutView);
-					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-					kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
-					// this will just close the outer stream
-					kgOutStream.close();
-					kgOutStream = null;
-				}
-
-			} finally {
-				// this will just close the outer stream
-				IOUtils.closeQuietly(kgOutStream);
-			}
-		}
-
-		private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
-			BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
-			BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
-		}
-
-		static void setMetaDataFollowsFlagInKey(byte[] key) {
-			key[0] |= FIRST_BIT_IN_BYTE_MASK;
-		}
-
-		static void clearMetaDataFollowsFlag(byte[] key) {
-			key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
-		}
-
-		static boolean hasMetaDataFollowsFlag(byte[] key) {
-			return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
-		}
-
-		private static void checkInterrupted() throws InterruptedException {
-			if (Thread.currentThread().isInterrupted()) {
-				throw new InterruptedException("RocksDB snapshot interrupted.");
-			}
-		}
-
-		@Override
-		protected void acquireResources() throws Exception {
-			stateBackend.cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
-			openCheckpointStream();
-		}
-
-		@Override
-		protected void releaseResources() {
-			closeLocalRegistry();
-			releaseSnapshotOperationResources();
-		}
-
-		private void releaseSnapshotOperationResources() {
-			// hold the db lock while operation on the db to guard us against async db disposal
-			releaseSnapshotResources();
-		}
-
-		@Override
-		protected void stopOperation() {
-			closeLocalRegistry();
-		}
-
-		private void closeLocalRegistry() {
-			if (stateBackend.cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
-				try {
-					snapshotCloseableRegistry.close();
-				} catch (Exception ex) {
-					LOG.warn("Error closing local registry", ex);
-				}
-			}
-		}
-
-		@Nonnull
-		@Override
-		public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
-			long startTime = System.currentTimeMillis();
-
-			if (isStopped()) {
-				throw new IOException("RocksDB closed.");
-			}
-
-			writeDBSnapshot();
-
-			LOG.debug("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
-				checkpointStreamSupplier, Thread.currentThread(), (System.currentTimeMillis() - startTime));
-
-			return getSnapshotResultStateHandle();
-		}
-	}
-
-	/**
-	 * Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend.
-	 */
-	private static final class RocksDBIncrementalSnapshotOperation<K> {
-
-		/** The backend which we snapshot. */
-		private final RocksDBKeyedStateBackend<K> stateBackend;
-
-		/** Stream factory that creates the outpus streams to DFS. */
-		private final CheckpointStreamFactory checkpointStreamFactory;
-
-		/** Id for the current checkpoint. */
-		private final long checkpointId;
-
-		/** All sst files that were part of the last previously completed checkpoint. */
-		private Set<StateHandleID> baseSstFiles;
-
-		/** The state meta data. */
-		private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>();
-
-		/** Local directory for the RocksDB native backup. */
-		private SnapshotDirectory localBackupDirectory;
-
-		// Registry for all opened i/o streams
-		private final CloseableRegistry closeableRegistry = new CloseableRegistry();
-
-		// new sst files since the last completed checkpoint
-		private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
-
-		// handles to the misc files in the current snapshot
-		private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
-
-		// This lease protects from concurrent disposal of the native rocksdb instance.
-		private final ResourceGuard.Lease dbLease;
-
-		private SnapshotResult<StreamStateHandle> metaStateHandle = null;
-
-		private RocksDBIncrementalSnapshotOperation(
-			RocksDBKeyedStateBackend<K> stateBackend,
-			CheckpointStreamFactory checkpointStreamFactory,
-			SnapshotDirectory localBackupDirectory,
-			long checkpointId) throws IOException {
-
-			this.stateBackend = stateBackend;
-			this.checkpointStreamFactory = checkpointStreamFactory;
-			this.checkpointId = checkpointId;
-			this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
-			this.localBackupDirectory = localBackupDirectory;
-		}
-
-		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
-			FSDataInputStream inputStream = null;
-			CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
-
-			try {
-				final byte[] buffer = new byte[8 * 1024];
-
-				FileSystem backupFileSystem = localBackupDirectory.getFileSystem();
-				inputStream = backupFileSystem.open(filePath);
-				closeableRegistry.registerCloseable(inputStream);
-
-				outputStream = checkpointStreamFactory
-					.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
-				closeableRegistry.registerCloseable(outputStream);
-
-				while (true) {
-					int numBytes = inputStream.read(buffer);
-
-					if (numBytes == -1) {
-						break;
-					}
-
-					outputStream.write(buffer, 0, numBytes);
-				}
-
-				StreamStateHandle result = null;
-				if (closeableRegistry.unregisterCloseable(outputStream)) {
-					result = outputStream.closeAndGetHandle();
-					outputStream = null;
-				}
-				return result;
-
-			} finally {
-
-				if (closeableRegistry.unregisterCloseable(inputStream)) {
-					inputStream.close();
-				}
-
-				if (closeableRegistry.unregisterCloseable(outputStream)) {
-					outputStream.close();
-				}
-			}
-		}
-
-		@Nonnull
-		private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
-
-			LocalRecoveryConfig localRecoveryConfig = stateBackend.localRecoveryConfig;
-
-			CheckpointStreamWithResultProvider streamWithResultProvider =
-
-				localRecoveryConfig.isLocalRecoveryEnabled() ?
-
-					CheckpointStreamWithResultProvider.createDuplicatingStream(
-						checkpointId,
-						CheckpointedStateScope.EXCLUSIVE,
-						checkpointStreamFactory,
-						localRecoveryConfig.getLocalStateDirectoryProvider()) :
-
-					CheckpointStreamWithResultProvider.createSimpleStream(
-						CheckpointedStateScope.EXCLUSIVE,
-						checkpointStreamFactory);
-
-			try {
-				closeableRegistry.registerCloseable(streamWithResultProvider);
-
-				//no need for compression scheme support because sst-files are already compressed
-				KeyedBackendSerializationProxy<K> serializationProxy =
-					new KeyedBackendSerializationProxy<>(
-						stateBackend.keySerializer,
-						stateMetaInfoSnapshots,
-						false);
-
-				DataOutputView out =
-					new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream());
-
-				serializationProxy.write(out);
-
-				if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
-					SnapshotResult<StreamStateHandle> result =
-						streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
-					streamWithResultProvider = null;
-					return result;
-				} else {
-					throw new IOException("Stream already closed and cannot return a handle.");
-				}
-			} finally {
-				if (streamWithResultProvider != null) {
-					if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
-						IOUtils.closeQuietly(streamWithResultProvider);
-					}
-				}
-			}
-		}
-
-		void takeSnapshot() throws Exception {
-
-			final long lastCompletedCheckpoint;
-
-			// use the last completed checkpoint as the comparison base.
-			synchronized (stateBackend.materializedSstFiles) {
-				lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
-				baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
-			}
-
-			LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
-				"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
-
-			// save meta data
-			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry
-				: stateBackend.kvStateInformation.entrySet()) {
-				stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
-			}
-
-			LOG.trace("Local RocksDB checkpoint goes to backup path {}.", localBackupDirectory);
-
-			if (localBackupDirectory.exists()) {
-				throw new IllegalStateException("Unexpected existence of the backup directory.");
-			}
-
-			// create hard links of living files in the snapshot path
-			try (Checkpoint checkpoint = Checkpoint.create(stateBackend.db)) {
-				checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath());
-			}
-		}
-
-		@Nonnull
-		SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception {
-
-			stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
-
-			// write meta data
-			metaStateHandle = materializeMetaData();
-
-			// sanity checks - they should never fail
-			Preconditions.checkNotNull(metaStateHandle,
-				"Metadata was not properly created.");
-			Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
-				"Metadata for job manager was not properly created.");
-
-			// write state data
-			Preconditions.checkState(localBackupDirectory.exists());
-
-			FileStatus[] fileStatuses = localBackupDirectory.listStatus();
-			if (fileStatuses != null) {
-				for (FileStatus fileStatus : fileStatuses) {
-					final Path filePath = fileStatus.getPath();
-					final String fileName = filePath.getName();
-					final StateHandleID stateHandleID = new StateHandleID(fileName);
-
-					if (fileName.endsWith(SST_FILE_SUFFIX)) {
-						final boolean existsAlready =
-							baseSstFiles != null && baseSstFiles.contains(stateHandleID);
-
-						if (existsAlready) {
-							// we introduce a placeholder state handle, that is replaced with the
-							// original from the shared state registry (created from a previous checkpoint)
-							sstFiles.put(
-								stateHandleID,
-								new PlaceholderStreamStateHandle());
-						} else {
-							sstFiles.put(stateHandleID, materializeStateData(filePath));
-						}
-					} else {
-						StreamStateHandle fileHandle = materializeStateData(filePath);
-						miscFiles.put(stateHandleID, fileHandle);
-					}
-				}
-			}
-
-			synchronized (stateBackend.materializedSstFiles) {
-				stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
-			}
-
-			IncrementalKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalKeyedStateHandle(
-				stateBackend.backendUID,
-				stateBackend.keyGroupRange,
-				checkpointId,
-				sstFiles,
-				miscFiles,
-				metaStateHandle.getJobManagerOwnedSnapshot());
-
-			StreamStateHandle taskLocalSnapshotMetaDataStateHandle = metaStateHandle.getTaskLocalSnapshot();
-			DirectoryStateHandle directoryStateHandle = null;
-
-			try {
-
-				directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
-			} catch (IOException ex) {
-
-				Exception collector = ex;
-
-				try {
-					taskLocalSnapshotMetaDataStateHandle.discardState();
-				} catch (Exception discardEx) {
-					collector = ExceptionUtils.firstOrSuppressed(discardEx, collector);
-				}
-
-				LOG.warn("Problem with local state snapshot.", collector);
-			}
-
-			if (directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null) {
-
-				IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
-					new IncrementalLocalKeyedStateHandle(
-						stateBackend.backendUID,
-						checkpointId,
-						directoryStateHandle,
-						stateBackend.keyGroupRange,
-						taskLocalSnapshotMetaDataStateHandle,
-						sstFiles.keySet());
-				return SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
-			} else {
-				return SnapshotResult.of(jmIncrementalKeyedStateHandle);
-			}
-		}
-
-		void stop() {
-
-			if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
-				try {
-					closeableRegistry.close();
-				} catch (IOException e) {
-					LOG.warn("Could not properly close io streams.", e);
-				}
-			}
-		}
-
-		void releaseResources(boolean canceled) {
-
-			dbLease.close();
-
-			if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
-				try {
-					closeableRegistry.close();
-				} catch (IOException e) {
-					LOG.warn("Exception on closing registry.", e);
-				}
-			}
-
-			try {
-				if (localBackupDirectory.exists()) {
-					LOG.trace("Running cleanup for local RocksDB backup directory {}.", localBackupDirectory);
-					boolean cleanupOk = localBackupDirectory.cleanup();
-
-					if (!cleanupOk) {
-						LOG.debug("Could not properly cleanup local RocksDB backup directory.");
-					}
-				}
-			} catch (IOException e) {
-				LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
-			}
-
-			if (canceled) {
-				Collection<StateObject> statesToDiscard =
-					new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
-
-				statesToDiscard.add(metaStateHandle);
-				statesToDiscard.addAll(miscFiles.values());
-				statesToDiscard.addAll(sstFiles.values());
-
-				try {
-					StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
-				} catch (Exception e) {
-					LOG.warn("Could not properly discard states.", e);
-				}
-
-				if (localBackupDirectory.isSnapshotCompleted()) {
-					try {
-						DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
-						if (directoryStateHandle != null) {
-							directoryStateHandle.discardState();
-						}
-					} catch (Exception e) {
-						LOG.warn("Could not properly discard local state.", e);
-					}
-				}
-			}
-		}
-	}
-
 	public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
 		return new RocksIteratorWrapper(db.newIterator());
 	}
@@ -2332,23 +1474,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
 	}
 
-	@SuppressWarnings("unchecked")
-	private static RocksIteratorWrapper getRocksIterator(
-		RocksDB db,
-		ColumnFamilyHandle columnFamilyHandle,
-		RegisteredStateMetaInfoBase metaInfo,
-		ReadOptions readOptions) {
-		StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
-		if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
-			stateSnapshotTransformer = (StateSnapshotTransformer<byte[]>)
-				((RegisteredKeyValueStateBackendMetaInfo<?, ?>) metaInfo).getSnapshotTransformer();
-		}
-		RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
-		return stateSnapshotTransformer == null ?
-			new RocksIteratorWrapper(rocksIterator) :
-			new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);
-	}
-
 	/**
 	 * Encapsulates the logic and resources in connection with creating priority queue state structures.
 	 */
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
new file mode 100644
index 0000000..0cc9729
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
+import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.setMetaDataFollowsFlagInKey;
+
+/**
+ * Snapshot strategy to create full snapshots of
+ * {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}. Iterates and writes all states from a
+ * RocksDB snapshot of the column families.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class RocksFullSnapshotStrategy<K> extends SnapshotStrategyBase<K> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(RocksFullSnapshotStrategy.class);
+
+	/** This decorator is used to apply compression per key-group for the written snapshot data. */
+	@Nonnull
+	private final StreamCompressionDecorator keyGroupCompressionDecorator;
+
+	public RocksFullSnapshotStrategy(
+		@Nonnull RocksDB db,
+		@Nonnull ResourceGuard rocksDBResourceGuard,
+		@Nonnull TypeSerializer<K> keySerializer,
+		@Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation,
+		@Nonnull KeyGroupRange keyGroupRange,
+		@Nonnegative int keyGroupPrefixBytes,
+		@Nonnull LocalRecoveryConfig localRecoveryConfig,
+		@Nonnull CloseableRegistry cancelStreamRegistry,
+		@Nonnull StreamCompressionDecorator keyGroupCompressionDecorator) {
+		super(
+			db,
+			rocksDBResourceGuard,
+			keySerializer,
+			kvStateInformation,
+			keyGroupRange,
+			keyGroupPrefixBytes,
+			localRecoveryConfig,
+			cancelStreamRegistry);
+
+		this.keyGroupCompressionDecorator = keyGroupCompressionDecorator;
+	}
+
+	@Override
+	public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
+		long checkpointId,
+		long timestamp,
+		CheckpointStreamFactory primaryStreamFactory,
+		CheckpointOptions checkpointOptions) throws Exception {
+
+		long startTime = System.currentTimeMillis();
+
+		if (kvStateInformation.isEmpty()) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
+					timestamp);
+			}
+
+			return DoneFuture.of(SnapshotResult.empty());
+		}
+
+		final SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier =
+
+			localRecoveryConfig.isLocalRecoveryEnabled() &&
+				(CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType()) ?
+
+				() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
+					checkpointId,
+					CheckpointedStateScope.EXCLUSIVE,
+					primaryStreamFactory,
+					localRecoveryConfig.getLocalStateDirectoryProvider()) :
+
+				() -> CheckpointStreamWithResultProvider.createSimpleStream(
+					CheckpointedStateScope.EXCLUSIVE,
+					primaryStreamFactory);
+
+		final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
+
+		final RocksDBFullSnapshotCallable snapshotOperation =
+			new RocksDBFullSnapshotCallable(supplier, snapshotCloseableRegistry);
+
+		return new SnapshotTask(snapshotOperation);
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) {
+		// nothing to do.
+	}
+
+	/**
+	 * Wrapping task to run a {@link RocksDBFullSnapshotCallable} and delegate cancellation.
+	 */
+	private class SnapshotTask extends FutureTask<SnapshotResult<KeyedStateHandle>> {
+
+		/** Reference to the callable for cancellation. */
+		@Nonnull
+		private final AutoCloseable callableClose;
+
+		SnapshotTask(@Nonnull RocksDBFullSnapshotCallable callable) {
+			super(callable);
+			this.callableClose = callable;
+		}
+
+		@Override
+		public boolean cancel(boolean mayInterruptIfRunning) {
+			IOUtils.closeQuietly(callableClose);
+			return super.cancel(mayInterruptIfRunning);
+		}
+	}
+
+	/**
+	 * Encapsulates the process to perform a full snapshot of a RocksDBKeyedStateBackend.
+	 */
+	@VisibleForTesting
+	private class RocksDBFullSnapshotCallable implements Callable<SnapshotResult<KeyedStateHandle>>, AutoCloseable {
+
+		@Nonnull
+		private final KeyGroupRangeOffsets keyGroupRangeOffsets;
+
+		@Nonnull
+		private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;
+
+		@Nonnull
+		private final CloseableRegistry snapshotCloseableRegistry;
+
+		@Nonnull
+		private final ResourceGuard.Lease dbLease;
+
+		@Nonnull
+		private final Snapshot snapshot;
+
+		@Nonnull
+		private final ReadOptions readOptions;
+
+		/**
+		 * The state meta data.
+		 */
+		@Nonnull
+		private List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+
+		/**
+		 * The copied column handle.
+		 */
+		@Nonnull
+		private List<Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> metaDataCopy;
+
+		private final AtomicBoolean ownedForCleanup;
+
+		RocksDBFullSnapshotCallable(
+			@Nonnull SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier,
+			@Nonnull CloseableRegistry registry) throws IOException {
+
+			this.ownedForCleanup = new AtomicBoolean(false);
+			this.checkpointStreamSupplier = checkpointStreamSupplier;
+			this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange);
+			this.snapshotCloseableRegistry = registry;
+
+			this.stateMetaInfoSnapshots = new ArrayList<>(kvStateInformation.size());
+			this.metaDataCopy = new ArrayList<>(kvStateInformation.size());
+			for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : kvStateInformation.values()) {
+				// snapshot meta info
+				this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
+				this.metaDataCopy.add(tuple2);
+			}
+
+			this.dbLease = rocksDBResourceGuard.acquireResource();
+
+			this.readOptions = new ReadOptions();
+			this.snapshot = db.getSnapshot();
+			this.readOptions.setSnapshot(snapshot);
+		}
+
+		@Override
+		public SnapshotResult<KeyedStateHandle> call() throws Exception {
+
+			if (!ownedForCleanup.compareAndSet(false, true)) {
+				throw new CancellationException("Snapshot task was already cancelled, stopping execution.");
+			}
+
+			final long startTime = System.currentTimeMillis();
+			final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators = new ArrayList<>(metaDataCopy.size());
+
+			try {
+
+				cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
+
+				final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider = checkpointStreamSupplier.get();
+				snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
+
+				final DataOutputView outputView =
+					new DataOutputViewStreamWrapper(checkpointStreamWithResultProvider.getCheckpointOutputStream());
+
+				writeKVStateMetaData(kvStateIterators, outputView);
+				writeKVStateData(kvStateIterators, checkpointStreamWithResultProvider);
+
+				final SnapshotResult<KeyedStateHandle> snapshotResult =
+					createStateHandlesFromStreamProvider(checkpointStreamWithResultProvider);
+
+				LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
+					checkpointStreamSupplier, Thread.currentThread(), (System.currentTimeMillis() - startTime));
+
+				return snapshotResult;
+
+			} finally {
+
+				for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : kvStateIterators) {
+					IOUtils.closeQuietly(kvStateIterator.f0);
+				}
+
+				cleanupSynchronousStepResources();
+			}
+		}
+
+		private void cleanupSynchronousStepResources() {
+			IOUtils.closeQuietly(readOptions);
+
+			db.releaseSnapshot(snapshot);
+			IOUtils.closeQuietly(snapshot);
+
+			IOUtils.closeQuietly(dbLease);
+
+			if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
+				try {
+					snapshotCloseableRegistry.close();
+				} catch (Exception ex) {
+					LOG.warn("Error closing local registry", ex);
+				}
+			}
+		}
+
+		private SnapshotResult<KeyedStateHandle> createStateHandlesFromStreamProvider(
+			CheckpointStreamWithResultProvider checkpointStreamWithResultProvider) throws IOException {
+			if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) {
+				return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(
+					checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(),
+					keyGroupRangeOffsets);
+			} else {
+				throw new IOException("Snapshot was already closed before completion.");
+			}
+		}
+
+		private void writeKVStateMetaData(
+			final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
+			final DataOutputView outputView) throws IOException {
+
+			int kvStateId = 0;
+
+			for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 : metaDataCopy) {
+
+				RocksIteratorWrapper rocksIteratorWrapper =
+					getRocksIterator(db, tuple2.f0, tuple2.f1, readOptions);
+
+				kvStateIterators.add(Tuple2.of(rocksIteratorWrapper, kvStateId));
+				++kvStateId;
+			}
+
+			KeyedBackendSerializationProxy<K> serializationProxy =
+				new KeyedBackendSerializationProxy<>(
+					// TODO: this code assumes that writing a serializer is threadsafe, we should support to
+					// get a serialized form already at state registration time in the future
+					keySerializer,
+					stateMetaInfoSnapshots,
+					!Objects.equals(
+						UncompressedStreamCompressionDecorator.INSTANCE,
+						keyGroupCompressionDecorator));
+
+			serializationProxy.write(outputView);
+		}
+
+		private void writeKVStateData(
+			final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators,
+			final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider) throws IOException, InterruptedException {
+
+			byte[] previousKey = null;
+			byte[] previousValue = null;
+			DataOutputView kgOutView = null;
+			OutputStream kgOutStream = null;
+			CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream =
+				checkpointStreamWithResultProvider.getCheckpointOutputStream();
+
+			try {
+				// Here we transfer ownership of RocksIterators to the RocksStatesPerKeyGroupMergeIterator
+				try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator(
+					kvStateIterators, keyGroupPrefixBytes)) {
+
+					//preamble: setup with first key-group as our lookahead
+					if (mergeIterator.isValid()) {
+						//begin first key-group by recording the offset
+						keyGroupRangeOffsets.setKeyGroupOffset(
+							mergeIterator.keyGroup(),
+							checkpointOutputStream.getPos());
+						//write the k/v-state id as metadata
+						kgOutStream = keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream);
+						kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
+						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+						kgOutView.writeShort(mergeIterator.kvStateId());
+						previousKey = mergeIterator.key();
+						previousValue = mergeIterator.value();
+						mergeIterator.next();
+					}
+
+					//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
+					while (mergeIterator.isValid()) {
+
+						assert (!hasMetaDataFollowsFlag(previousKey));
+
+						//set signal in first key byte that meta data will follow in the stream after this k/v pair
+						if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
+
+							//be cooperative and check for interruption from time to time in the hot loop
+							checkInterrupted();
+
+							setMetaDataFollowsFlagInKey(previousKey);
+						}
+
+						writeKeyValuePair(previousKey, previousValue, kgOutView);
+
+						//write meta data if we have to
+						if (mergeIterator.isNewKeyGroup()) {
+							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+							kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+							// this will just close the outer stream
+							kgOutStream.close();
+							//begin new key-group
+							keyGroupRangeOffsets.setKeyGroupOffset(
+								mergeIterator.keyGroup(),
+								checkpointOutputStream.getPos());
+							//write the kev-state
+							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+							kgOutStream = keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream);
+							kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
+							kgOutView.writeShort(mergeIterator.kvStateId());
+						} else if (mergeIterator.isNewKeyValueState()) {
+							//write the k/v-state
+							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+							kgOutView.writeShort(mergeIterator.kvStateId());
+						}
+
+						//request next k/v pair
+						previousKey = mergeIterator.key();
+						previousValue = mergeIterator.value();
+						mergeIterator.next();
+					}
+				}
+
+				//epilogue: write last key-group
+				if (previousKey != null) {
+					assert (!hasMetaDataFollowsFlag(previousKey));
+					setMetaDataFollowsFlagInKey(previousKey);
+					writeKeyValuePair(previousKey, previousValue, kgOutView);
+					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+					kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+					// this will just close the outer stream
+					kgOutStream.close();
+					kgOutStream = null;
+				}
+
+			} finally {
+				// this will just close the outer stream
+				IOUtils.closeQuietly(kgOutStream);
+			}
+		}
+
+		private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
+			BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
+			BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
+		}
+
+		private void checkInterrupted() throws InterruptedException {
+			if (Thread.currentThread().isInterrupted()) {
+				throw new InterruptedException("RocksDB snapshot interrupted.");
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+
+			if (ownedForCleanup.compareAndSet(false, true)) {
+				cleanupSynchronousStepResources();
+			}
+
+			if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
+				snapshotCloseableRegistry.close();
+			}
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private static RocksIteratorWrapper getRocksIterator(
+		RocksDB db,
+		ColumnFamilyHandle columnFamilyHandle,
+		RegisteredStateMetaInfoBase metaInfo,
+		ReadOptions readOptions) {
+		StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
+		if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
+			stateSnapshotTransformer = (StateSnapshotTransformer<byte[]>)
+				((RegisteredKeyValueStateBackendMetaInfo<?, ?>) metaInfo).getSnapshotTransformer();
+		}
+		RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
+		return stateSnapshotTransformer == null ?
+			new RocksIteratorWrapper(rocksIterator) :
+			new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
new file mode 100644
index 0000000..3487fe6
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.DirectoryStateHandle;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.SnapshotDirectory;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.Checkpoint;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
+
+/**
+ * Snapshot strategy for {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend} that is based
+ * on RocksDB's native checkpoints and creates incremental snapshots.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class RocksIncrementalSnapshotStrategy<K> extends SnapshotStrategyBase<K> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(RocksIncrementalSnapshotStrategy.class);
+
+	/** Base path of the RocksDB instance. */
+	@Nonnull
+	private final File instanceBasePath;
+
+	/** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
+	@Nonnull
+	private final UUID backendUID;
+
+	/** Stores the materialized sstable files from all snapshots that build the incremental history. */
+	@Nonnull
+	private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
+
+	/** The identifier of the last completed checkpoint. */
+	private long lastCompletedCheckpointId;
+
+	/** We delegate snapshots that are for savepoints to this. */
+	@Nonnull
+	private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate;
+
+	public RocksIncrementalSnapshotStrategy(
+		@Nonnull RocksDB db,
+		@Nonnull ResourceGuard rocksDBResourceGuard,
+		@Nonnull TypeSerializer<K> keySerializer,
+		@Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation,
+		@Nonnull KeyGroupRange keyGroupRange,
+		@Nonnegative int keyGroupPrefixBytes,
+		@Nonnull LocalRecoveryConfig localRecoveryConfig,
+		@Nonnull CloseableRegistry cancelStreamRegistry,
+		@Nonnull File instanceBasePath,
+		@Nonnull UUID backendUID,
+		@Nonnull SortedMap<Long, Set<StateHandleID>> materializedSstFiles,
+		long lastCompletedCheckpointId,
+		@Nonnull SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate) {
+
+		super(
+			db,
+			rocksDBResourceGuard,
+			keySerializer,
+			kvStateInformation,
+			keyGroupRange,
+			keyGroupPrefixBytes,
+			localRecoveryConfig,
+			cancelStreamRegistry);
+
+		this.instanceBasePath = instanceBasePath;
+		this.backendUID = backendUID;
+		this.materializedSstFiles = materializedSstFiles;
+		this.lastCompletedCheckpointId = lastCompletedCheckpointId;
+		this.savepointDelegate = savepointDelegate;
+	}
+
+	@Override
+	public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(
+		long checkpointId,
+		long checkpointTimestamp,
+		CheckpointStreamFactory checkpointStreamFactory,
+		CheckpointOptions checkpointOptions) throws Exception {
+
+		// for savepoints, we delegate to the full snapshot strategy because savepoints are always self-contained.
+		if (CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType()) {
+			return savepointDelegate.performSnapshot(
+				checkpointId,
+				checkpointTimestamp,
+				checkpointStreamFactory,
+				checkpointOptions);
+		}
+
+		if (kvStateInformation.isEmpty()) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", checkpointTimestamp);
+			}
+			return DoneFuture.of(SnapshotResult.empty());
+		}
+
+		SnapshotDirectory snapshotDirectory;
+
+		if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+			// create a "permanent" snapshot directory for local recovery.
+			LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider();
+			File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
+
+			if (directory.exists()) {
+				FileUtils.deleteDirectory(directory);
+			}
+
+			if (!directory.mkdirs()) {
+				throw new IOException("Local state base directory for checkpoint " + checkpointId +
+					" already exists: " + directory);
+			}
+
+			// introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints.
+			File rdbSnapshotDir = new File(directory, "rocks_db");
+			Path path = new Path(rdbSnapshotDir.toURI());
+			// create a "permanent" snapshot directory because local recovery is active.
+			snapshotDirectory = SnapshotDirectory.permanent(path);
+		} else {
+			// create a "temporary" snapshot directory because local recovery is inactive.
+			Path path = new Path(instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
+			snapshotDirectory = SnapshotDirectory.temporary(path);
+		}
+
+		final RocksDBIncrementalSnapshotOperation snapshotOperation =
+			new RocksDBIncrementalSnapshotOperation(
+				checkpointStreamFactory,
+				snapshotDirectory,
+				checkpointId);
+
+		try {
+			snapshotOperation.takeSnapshot();
+		} catch (Exception e) {
+			snapshotOperation.stop();
+			snapshotOperation.releaseResources(true);
+			throw e;
+		}
+
+		return new FutureTask<SnapshotResult<KeyedStateHandle>>(
+			snapshotOperation::runSnapshot
+		) {
+			@Override
+			public boolean cancel(boolean mayInterruptIfRunning) {
+				snapshotOperation.stop();
+				return super.cancel(mayInterruptIfRunning);
+			}
+
+			@Override
+			protected void done() {
+				snapshotOperation.releaseResources(isCancelled());
+			}
+		};
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long completedCheckpointId) {
+		synchronized (materializedSstFiles) {
+
+			if (completedCheckpointId < lastCompletedCheckpointId) {
+				return;
+			}
+
+			materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
+
+			lastCompletedCheckpointId = completedCheckpointId;
+		}
+	}
+
+	/**
+	 * Encapsulates the process to perform an incremental snapshot of a RocksDBKeyedStateBackend.
+	 */
+	private final class RocksDBIncrementalSnapshotOperation {
+
+		/**
+		 * Stream factory that creates the outpus streams to DFS.
+		 */
+		private final CheckpointStreamFactory checkpointStreamFactory;
+
+		/**
+		 * Id for the current checkpoint.
+		 */
+		private final long checkpointId;
+
+		/**
+		 * All sst files that were part of the last previously completed checkpoint.
+		 */
+		private Set<StateHandleID> baseSstFiles;
+
+		/**
+		 * The state meta data.
+		 */
+		private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
+
+		/**
+		 * Local directory for the RocksDB native backup.
+		 */
+		private SnapshotDirectory localBackupDirectory;
+
+		// Registry for all opened i/o streams
+		private final CloseableRegistry closeableRegistry;
+
+		// new sst files since the last completed checkpoint
+		private final Map<StateHandleID, StreamStateHandle> sstFiles;
+
+		// handles to the misc files in the current snapshot
+		private final Map<StateHandleID, StreamStateHandle> miscFiles;
+
+		// This lease protects from concurrent disposal of the native rocksdb instance.
+		private final ResourceGuard.Lease dbLease;
+
+		private SnapshotResult<StreamStateHandle> metaStateHandle;
+
+		private RocksDBIncrementalSnapshotOperation(
+			CheckpointStreamFactory checkpointStreamFactory,
+			SnapshotDirectory localBackupDirectory,
+			long checkpointId) throws IOException {
+
+			this.checkpointStreamFactory = checkpointStreamFactory;
+			this.checkpointId = checkpointId;
+			this.localBackupDirectory = localBackupDirectory;
+			this.stateMetaInfoSnapshots = new ArrayList<>();
+			this.closeableRegistry = new CloseableRegistry();
+			this.sstFiles = new HashMap<>();
+			this.miscFiles = new HashMap<>();
+			this.metaStateHandle = null;
+			this.dbLease = rocksDBResourceGuard.acquireResource();
+		}
+
+		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
+			FSDataInputStream inputStream = null;
+			CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
+
+			try {
+				final byte[] buffer = new byte[8 * 1024];
+
+				FileSystem backupFileSystem = localBackupDirectory.getFileSystem();
+				inputStream = backupFileSystem.open(filePath);
+				closeableRegistry.registerCloseable(inputStream);
+
+				outputStream = checkpointStreamFactory
+					.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
+				closeableRegistry.registerCloseable(outputStream);
+
+				while (true) {
+					int numBytes = inputStream.read(buffer);
+
+					if (numBytes == -1) {
+						break;
+					}
+
+					outputStream.write(buffer, 0, numBytes);
+				}
+
+				StreamStateHandle result = null;
+				if (closeableRegistry.unregisterCloseable(outputStream)) {
+					result = outputStream.closeAndGetHandle();
+					outputStream = null;
+				}
+				return result;
+
+			} finally {
+
+				if (closeableRegistry.unregisterCloseable(inputStream)) {
+					inputStream.close();
+				}
+
+				if (closeableRegistry.unregisterCloseable(outputStream)) {
+					outputStream.close();
+				}
+			}
+		}
+
+		@Nonnull
+		private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
+
+			CheckpointStreamWithResultProvider streamWithResultProvider =
+
+				localRecoveryConfig.isLocalRecoveryEnabled() ?
+
+					CheckpointStreamWithResultProvider.createDuplicatingStream(
+						checkpointId,
+						CheckpointedStateScope.EXCLUSIVE,
+						checkpointStreamFactory,
+						localRecoveryConfig.getLocalStateDirectoryProvider()) :
+
+					CheckpointStreamWithResultProvider.createSimpleStream(
+						CheckpointedStateScope.EXCLUSIVE,
+						checkpointStreamFactory);
+
+			try {
+				closeableRegistry.registerCloseable(streamWithResultProvider);
+
+				//no need for compression scheme support because sst-files are already compressed
+				KeyedBackendSerializationProxy<K> serializationProxy =
+					new KeyedBackendSerializationProxy<>(
+						keySerializer,
+						stateMetaInfoSnapshots,
+						false);
+
+				DataOutputView out =
+					new DataOutputViewStreamWrapper(streamWithResultProvider.getCheckpointOutputStream());
+
+				serializationProxy.write(out);
+
+				if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
+					SnapshotResult<StreamStateHandle> result =
+						streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
+					streamWithResultProvider = null;
+					return result;
+				} else {
+					throw new IOException("Stream already closed and cannot return a handle.");
+				}
+			} finally {
+				if (streamWithResultProvider != null) {
+					if (closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
+						IOUtils.closeQuietly(streamWithResultProvider);
+					}
+				}
+			}
+		}
+
+		void takeSnapshot() throws Exception {
+
+			final long lastCompletedCheckpoint;
+
+			// use the last completed checkpoint as the comparison base.
+			synchronized (materializedSstFiles) {
+				lastCompletedCheckpoint = lastCompletedCheckpointId;
+				baseSstFiles = materializedSstFiles.get(lastCompletedCheckpoint);
+			}
+
+			LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
+				"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+
+			// save meta data
+			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry
+				: kvStateInformation.entrySet()) {
+				stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
+			}
+
+			LOG.trace("Local RocksDB checkpoint goes to backup path {}.", localBackupDirectory);
+
+			if (localBackupDirectory.exists()) {
+				throw new IllegalStateException("Unexpected existence of the backup directory.");
+			}
+
+			// create hard links of living files in the snapshot path
+			try (Checkpoint checkpoint = Checkpoint.create(db)) {
+				checkpoint.createCheckpoint(localBackupDirectory.getDirectory().getPath());
+			}
+		}
+
+		@Nonnull
+		SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception {
+
+			cancelStreamRegistry.registerCloseable(closeableRegistry);
+
+			// write meta data
+			metaStateHandle = materializeMetaData();
+
+			// sanity checks - they should never fail
+			Preconditions.checkNotNull(metaStateHandle,
+				"Metadata was not properly created.");
+			Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
+				"Metadata for job manager was not properly created.");
+
+			// write state data
+			Preconditions.checkState(localBackupDirectory.exists());
+
+			FileStatus[] fileStatuses = localBackupDirectory.listStatus();
+			if (fileStatuses != null) {
+				for (FileStatus fileStatus : fileStatuses) {
+					final Path filePath = fileStatus.getPath();
+					final String fileName = filePath.getName();
+					final StateHandleID stateHandleID = new StateHandleID(fileName);
+
+					if (fileName.endsWith(SST_FILE_SUFFIX)) {
+						final boolean existsAlready =
+							baseSstFiles != null && baseSstFiles.contains(stateHandleID);
+
+						if (existsAlready) {
+							// we introduce a placeholder state handle, that is replaced with the
+							// original from the shared state registry (created from a previous checkpoint)
+							sstFiles.put(
+								stateHandleID,
+								new PlaceholderStreamStateHandle());
+						} else {
+							sstFiles.put(stateHandleID, materializeStateData(filePath));
+						}
+					} else {
+						StreamStateHandle fileHandle = materializeStateData(filePath);
+						miscFiles.put(stateHandleID, fileHandle);
+					}
+				}
+			}
+
+			synchronized (materializedSstFiles) {
+				materializedSstFiles.put(checkpointId, sstFiles.keySet());
+			}
+
+			IncrementalKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalKeyedStateHandle(
+				backendUID,
+				keyGroupRange,
+				checkpointId,
+				sstFiles,
+				miscFiles,
+				metaStateHandle.getJobManagerOwnedSnapshot());
+
+			StreamStateHandle taskLocalSnapshotMetaDataStateHandle = metaStateHandle.getTaskLocalSnapshot();
+			DirectoryStateHandle directoryStateHandle = null;
+
+			try {
+
+				directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
+			} catch (IOException ex) {
+
+				Exception collector = ex;
+
+				try {
+					taskLocalSnapshotMetaDataStateHandle.discardState();
+				} catch (Exception discardEx) {
+					collector = ExceptionUtils.firstOrSuppressed(discardEx, collector);
+				}
+
+				LOG.warn("Problem with local state snapshot.", collector);
+			}
+
+			if (directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null) {
+
+				IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
+					new IncrementalLocalKeyedStateHandle(
+						backendUID,
+						checkpointId,
+						directoryStateHandle,
+						keyGroupRange,
+						taskLocalSnapshotMetaDataStateHandle,
+						sstFiles.keySet());
+				return SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
+			} else {
+				return SnapshotResult.of(jmIncrementalKeyedStateHandle);
+			}
+		}
+
+		void stop() {
+
+			if (cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+				try {
+					closeableRegistry.close();
+				} catch (IOException e) {
+					LOG.warn("Could not properly close io streams.", e);
+				}
+			}
+		}
+
+		void releaseResources(boolean canceled) {
+
+			dbLease.close();
+
+			if (cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
+				try {
+					closeableRegistry.close();
+				} catch (IOException e) {
+					LOG.warn("Exception on closing registry.", e);
+				}
+			}
+
+			try {
+				if (localBackupDirectory.exists()) {
+					LOG.trace("Running cleanup for local RocksDB backup directory {}.", localBackupDirectory);
+					boolean cleanupOk = localBackupDirectory.cleanup();
+
+					if (!cleanupOk) {
+						LOG.debug("Could not properly cleanup local RocksDB backup directory.");
+					}
+				}
+			} catch (IOException e) {
+				LOG.warn("Could not properly cleanup local RocksDB backup directory.", e);
+			}
+
+			if (canceled) {
+				Collection<StateObject> statesToDiscard =
+					new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
+
+				statesToDiscard.add(metaStateHandle);
+				statesToDiscard.addAll(miscFiles.values());
+				statesToDiscard.addAll(sstFiles.values());
+
+				try {
+					StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
+				} catch (Exception e) {
+					LOG.warn("Could not properly discard states.", e);
+				}
+
+				if (localBackupDirectory.isSnapshotCompleted()) {
+					try {
+						DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
+						if (directoryStateHandle != null) {
+							directoryStateHandle.discardState();
+						}
+					} catch (Exception e) {
+						LOG.warn("Could not properly discard local state.", e);
+					}
+				}
+			}
+		}
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
new file mode 100644
index 0000000..bf2bbdb
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+/**
+ * Utility methods and constants around RocksDB creating and restoring snapshots for
+ * {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}.
+ */
+public class RocksSnapshotUtil {
+
+	/**
+	 * File suffix of sstable files.
+	 */
+	public static final String SST_FILE_SUFFIX = ".sst";
+
+	public static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
+
+	public static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
+
+	public static void setMetaDataFollowsFlagInKey(byte[] key) {
+		key[0] |= FIRST_BIT_IN_BYTE_MASK;
+	}
+
+	public static void clearMetaDataFollowsFlag(byte[] key) {
+		key[0] &= (~FIRST_BIT_IN_BYTE_MASK);
+	}
+
+	public static boolean hasMetaDataFollowsFlag(byte[] key) {
+		return 0 != (key[0] & FIRST_BIT_IN_BYTE_MASK);
+	}
+
+	private RocksSnapshotUtil() {
+		throw new AssertionError();
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java
new file mode 100644
index 0000000..efebe8c
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/SnapshotStrategyBase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.SnapshotStrategy;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.util.LinkedHashMap;
+
+/**
+ * Base class for {@link SnapshotStrategy} implementations on RocksDB.
+ *
+ * @param <K> type of the backend keys.
+ */
+public abstract class SnapshotStrategyBase<K> implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
+
+	@Nonnull
+	protected final RocksDB db;
+
+	@Nonnull
+	protected final ResourceGuard rocksDBResourceGuard;
+
+	@Nonnull
+	protected final TypeSerializer<K> keySerializer;
+
+	@Nonnull
+	protected final LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
+
+	@Nonnull
+	protected final KeyGroupRange keyGroupRange;
+
+	@Nonnegative
+	protected final int keyGroupPrefixBytes;
+
+	@Nonnull
+	protected final LocalRecoveryConfig localRecoveryConfig;
+
+	@Nonnull
+	protected final CloseableRegistry cancelStreamRegistry;
+
+	public SnapshotStrategyBase(
+		@Nonnull RocksDB db,
+		@Nonnull ResourceGuard rocksDBResourceGuard,
+		@Nonnull TypeSerializer<K> keySerializer,
+		@Nonnull LinkedHashMap<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation,
+		@Nonnull KeyGroupRange keyGroupRange,
+		@Nonnegative int keyGroupPrefixBytes,
+		@Nonnull LocalRecoveryConfig localRecoveryConfig,
+		@Nonnull CloseableRegistry cancelStreamRegistry) {
+
+		this.db = db;
+		this.rocksDBResourceGuard = rocksDBResourceGuard;
+		this.keySerializer = keySerializer;
+		this.kvStateInformation = kvStateInformation;
+		this.keyGroupRange = keyGroupRange;
+		this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+		this.localRecoveryConfig = localRecoveryConfig;
+		this.cancelStreamRegistry = cancelStreamRegistry;
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index e344638..c872553 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -91,6 +91,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.FIRST_BIT_IN_BYTE_MASK;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.clearMetaDataFollowsFlag;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag;
+import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.setMetaDataFollowsFlagInKey;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.spy;
@@ -425,21 +430,19 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 	@Test
 	public void testConsistentSnapshotSerializationFlagsAndMasks() {
 
-		Assert.assertEquals(0xFFFF, RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK);
-		Assert.assertEquals(0x80, RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
+		Assert.assertEquals(0xFFFF, END_OF_KEY_GROUP_MARK);
+		Assert.assertEquals(0x80, FIRST_BIT_IN_BYTE_MASK);
 
 		byte[] expectedKey = new byte[] {42, 42};
 		byte[] modKey = expectedKey.clone();
 
-		Assert.assertFalse(
-			RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+		Assert.assertFalse(hasMetaDataFollowsFlag(modKey));
 
-		RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(modKey);
-		Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+		setMetaDataFollowsFlagInKey(modKey);
+		Assert.assertTrue(hasMetaDataFollowsFlag(modKey));
 
-		RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(modKey);
-		Assert.assertFalse(
-			RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+		clearMetaDataFollowsFlag(modKey);
+		Assert.assertFalse(hasMetaDataFollowsFlag(modKey));
 
 		Assert.assertTrue(Arrays.equals(expectedKey, modKey));
 	}
@@ -504,12 +507,12 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
 		@Nullable
 		@Override
-		public StreamStateHandle closeAndGetHandle() throws IOException {
+		public StreamStateHandle closeAndGetHandle() {
 			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public long getPos() throws IOException {
+		public long getPos() {
 			throw new UnsupportedOperationException();
 		}
 
@@ -529,7 +532,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 		}
 
 		@Override
-		public void close() throws IOException {
+		public void close() {
 			throw new UnsupportedOperationException();
 		}
 	}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 0ea0d3f..4916251 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -191,6 +191,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		allCreatedCloseables = new ArrayList<>();
 
 		keyedStateBackend.db = spy(keyedStateBackend.db);
+		keyedStateBackend.initializeSnapshotStrategy(null);
 
 		doAnswer(new Answer<Object>() {