You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:23 UTC

[05/27] flink git commit: [FLINK-3761] Refactor RocksDB Backend/Make Key-Group Aware

[FLINK-3761] Refactor RocksDB Backend/Make Key-Group Aware

This change makes the RocksDB backend key-group aware by building on the
changes in the previous commit.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/addd0842
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/addd0842
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/addd0842

Branch: refs/heads/master
Commit: addd0842f9d74b05fc44374c6682b6e697603939
Parents: 4809f53
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Aug 17 14:50:18 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Aug 31 19:10:01 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |  98 ++-
 .../streaming/state/RocksDBFoldingState.java    |  25 +-
 .../state/RocksDBKeyedStateBackend.java         | 786 ++++++++++++++++++-
 .../streaming/state/RocksDBListState.java       |  22 +-
 .../streaming/state/RocksDBReducingState.java   |  24 +-
 .../streaming/state/RocksDBStateBackend.java    |  24 +-
 .../streaming/state/RocksDBValueState.java      |  27 +-
 .../state/RocksDBAsyncKVSnapshotTest.java       | 330 --------
 .../state/RocksDBAsyncSnapshotTest.java         | 397 ++++++++++
 .../state/RocksDBMergeIteratorTest.java         | 140 ++++
 .../state/RocksDBStateBackendConfigTest.java    | 690 ++++++++--------
 .../io/async/AbstractAsyncIOCallable.java       | 157 ++++
 .../runtime/io/async/AsyncDoneCallback.java     |  31 +
 .../flink/runtime/io/async/AsyncStoppable.java  |  47 ++
 .../async/AsyncStoppableTaskWithCallback.java   |  55 ++
 .../io/async/StoppableCallbackCallable.java     |  30 +
 .../memory/MemCheckpointStreamFactory.java      |   6 +-
 .../streaming/runtime/tasks/StreamTask.java     |  18 +
 18 files changed, 2136 insertions(+), 771 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 710f506..cbc2757 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -20,8 +20,11 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.util.Preconditions;
 import org.rocksdb.ColumnFamilyHandle;
@@ -30,7 +33,6 @@ import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 /**
@@ -56,7 +58,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 	private N currentNamespace;
 
 	/** Backend that holds the actual RocksDB instance where we store state */
-	protected RocksDBKeyedStateBackend backend;
+	protected RocksDBKeyedStateBackend<K> backend;
 
 	/** The column family of this particular instance of state */
 	protected ColumnFamilyHandle columnFamily;
@@ -69,14 +71,20 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 	 */
 	private final WriteOptions writeOptions;
 
+	protected final ByteArrayOutputStreamWithPos keySerializationStream;
+	protected final DataOutputView keySerializationDateDataOutputView;
+
+	private final boolean ambiguousKeyPossible;
+
 	/**
 	 * Creates a new RocksDB backed state.
 	 *  @param namespaceSerializer The serializer for the namespace.
 	 */
-	protected AbstractRocksDBState(ColumnFamilyHandle columnFamily,
+	protected AbstractRocksDBState(
+			ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			SD stateDesc,
-			RocksDBKeyedStateBackend backend) {
+			RocksDBKeyedStateBackend<K> backend) {
 
 		this.namespaceSerializer = namespaceSerializer;
 		this.backend = backend;
@@ -85,31 +93,27 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 
 		writeOptions = new WriteOptions();
 		writeOptions.setDisableWAL(true);
-
 		this.stateDesc = Preconditions.checkNotNull(stateDesc, "State Descriptor");
+
+		this.keySerializationStream = new ByteArrayOutputStreamWithPos(128);
+		this.keySerializationDateDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream);
+		this.ambiguousKeyPossible = (backend.getKeySerializer().getLength() < 0)
+				&& (namespaceSerializer.getLength() < 0);
 	}
 
 	// ------------------------------------------------------------------------
 
 	@Override
 	public void clear() {
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
 		try {
-			writeKeyAndNamespace(out);
-			byte[] key = baos.toByteArray();
+			writeCurrentKeyWithGroupAndNamespace();
+			byte[] key = keySerializationStream.toByteArray();
 			backend.db.remove(columnFamily, writeOptions, key);
 		} catch (IOException|RocksDBException e) {
 			throw new RuntimeException("Error while removing entry from RocksDB", e);
 		}
 	}
 
-	protected void writeKeyAndNamespace(DataOutputView out) throws IOException {
-		backend.getKeySerializer().serialize(backend.getCurrentKey(), out);
-		out.writeByte(42);
-		namespaceSerializer.serialize(currentNamespace, out);
-	}
-
 	@Override
 	public void setCurrentNamespace(N namespace) {
 		this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace");
@@ -118,17 +122,67 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 	@Override
 	@SuppressWarnings("unchecked")
 	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
-		// Serialized key and namespace is expected to be of the same format
-		// as writeKeyAndNamespace()
 		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
 
-		byte[] value = backend.db.get(columnFamily, serializedKeyAndNamespace);
+		//TODO make KvStateRequestSerializer key-group aware to save this round trip and key-group computation
+		Tuple2<K, N> des = KvStateRequestSerializer.<K, N>deserializeKeyAndNamespace(
+				serializedKeyAndNamespace,
+				backend.getKeySerializer(),
+				namespaceSerializer);
+
+		int keyGroup = backend.getKeyGroupAssigner().getKeyGroupIndex(des.f0);
+		writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
+		return backend.db.get(columnFamily, keySerializationStream.toByteArray());
+
+	}
+
+	protected void writeCurrentKeyWithGroupAndNamespace() throws IOException {
+		writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), backend.getCurrentKey(), currentNamespace);
+	}
+
+	protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N namespace) throws IOException {
+		keySerializationStream.reset();
+		writeKeyGroup(keyGroup);
+		writeKey(key);
+		writeNameSpace(namespace);
+	}
+
+	private void writeKeyGroup(int keyGroup) throws IOException {
+		for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
+			keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3));
+		}
+	}
+
+	private void writeKey(K key) throws IOException {
+		//write key
+		int beforeWrite = (int) keySerializationStream.getPosition();
+		backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView);
+
+		if (ambiguousKeyPossible) {
+			//write size of key
+			writeLengthFrom(beforeWrite);
+		}
+	}
+
+	private void writeNameSpace(N namespace) throws IOException {
+		int beforeWrite = (int) keySerializationStream.getPosition();
+		namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView);
 
-		if (value != null) {
-			return value;
-		} else {
-			return null;
+		if (ambiguousKeyPossible) {
+			//write length of namespace
+			writeLengthFrom(beforeWrite);
 		}
 	}
 
+	private void writeLengthFrom(int fromPosition) throws IOException {
+		int length = (int) (keySerializationStream.getPosition() - fromPosition);
+		writeVariableIntBytes(length);
+	}
+
+	private void writeVariableIntBytes(int value) throws IOException {
+		do {
+			keySerializationDateDataOutputView.writeByte(value);
+			value >>>= 8;
+		} while (value != 0);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 8c0799b..3018f7b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -29,7 +29,6 @@ import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 /**
@@ -66,7 +65,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
 	public RocksDBFoldingState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc,
-			RocksDBKeyedStateBackend backend) {
+			RocksDBKeyedStateBackend<K> backend) {
 
 		super(columnFamily, namespaceSerializer, stateDesc, backend);
 
@@ -79,11 +78,9 @@ public class RocksDBFoldingState<K, N, T, ACC>
 
 	@Override
 	public ACC get() {
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
 		try {
-			writeKeyAndNamespace(out);
-			byte[] key = baos.toByteArray();
+			writeCurrentKeyWithGroupAndNamespace();
+			byte[] key = keySerializationStream.toByteArray();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 			if (valueBytes == null) {
 				return null;
@@ -96,23 +93,21 @@ public class RocksDBFoldingState<K, N, T, ACC>
 
 	@Override
 	public void add(T value) throws IOException {
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
 		try {
-			writeKeyAndNamespace(out);
-			byte[] key = baos.toByteArray();
+			writeCurrentKeyWithGroupAndNamespace();
+			byte[] key = keySerializationStream.toByteArray();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
-
+			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
 			if (valueBytes == null) {
-				baos.reset();
+				keySerializationStream.reset();
 				valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
-				backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
+				backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
 			} else {
 				ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
 				ACC newValue = foldFunction.fold(oldValue, value);
-				baos.reset();
+				keySerializationStream.reset();
 				valueSerializer.serialize(newValue, out);
-				backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
+				backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
 			}
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 63f1fa2..a1634b2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
@@ -29,30 +30,49 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
+import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.util.Preconditions;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
 import java.io.File;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.PriorityQueue;
+import java.util.concurrent.RunnableFuture;
 
 /**
  * A {@link KeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to
@@ -80,25 +100,29 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 	private final File instanceRocksDBPath;
 
 	/**
+	 * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous
+	 * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try
+	 * iterating over a disposed db.
+	 */
+	private final SerializableObject dbDisposeLock = new SerializableObject();
+
+	/**
 	 * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
 	 * to store state. The different k/v states that we have don't each have their own RocksDB
 	 * instance. They all write to this instance but to their own column family.
 	 */
+	@GuardedBy("dbDisposeLock")
 	protected volatile RocksDB db;
 
 	/**
-	 * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous
-	 * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try
-	 * iterating over a disposed db.
-	 */
-	private final SerializableObject dbCleanupLock = new SerializableObject();
-
-	/**
 	 * Information about the k/v states as we create them. This is used to retrieve the
 	 * column family that is used for a state and also for sanity checks when restoring.
 	 */
 	private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation;
 
+	/** Number of bytes required to prefix the key groups. */
+	private final int keyGroupPrefixBytes;
+
 	public RocksDBKeyedStateBackend(
 			JobID jobId,
 			String operatorIdentifier,
@@ -108,7 +132,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 			TaskKvStateRegistry kvStateRegistry,
 			TypeSerializer<K> keySerializer,
 			KeyGroupAssigner<K> keyGroupAssigner,
-	        KeyGroupRange keyGroupRange
+			KeyGroupRange keyGroupRange
 	) throws Exception {
 
 		super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);
@@ -147,35 +171,543 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 		} catch (RocksDBException e) {
 			throw new RuntimeException("Error while opening RocksDB instance.", e);
 		}
-
+		keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
 		kvStateInformation = new HashMap<>();
 	}
 
+	public RocksDBKeyedStateBackend(
+			JobID jobId,
+			String operatorIdentifier,
+			File instanceBasePath,
+			DBOptions dbOptions,
+			ColumnFamilyOptions columnFamilyOptions,
+			TaskKvStateRegistry kvStateRegistry,
+			TypeSerializer<K> keySerializer,
+			KeyGroupAssigner<K> keyGroupAssigner,
+			KeyGroupRange keyGroupRange,
+			List<KeyGroupsStateHandle> restoreState
+	) throws Exception {
+		this(
+			jobId,
+			operatorIdentifier,
+			instanceBasePath,
+			dbOptions,
+			columnFamilyOptions,
+			kvStateRegistry,
+			keySerializer,
+			keyGroupAssigner,
+			keyGroupRange);
+
+		LOG.info("Initializing RocksDB keyed state backend from snapshot.");
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
+		}
+
+		RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this);
+		restoreOperation.doRestore(restoreState);
+	}
+
+	/**
+	 * @see java.io.Closeable
+	 *
+	 * Should only be called by one thread.
+	 *
+	 * @throws Exception
+	 */
 	@Override
 	public void close() throws Exception {
 		super.close();
 
-		// we have to lock because we might have an asynchronous checkpoint going on
-		synchronized (dbCleanupLock) {
-			if (db != null) {
-				for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
-					column.f0.dispose();
-				}
+		final RocksDB cleanupRockDBReference;
 
-				db.dispose();
-				db = null;
+		// Acquire the log on dbDisposeLock, so that no ongoing snapshots access the db during cleanup
+		synchronized (dbDisposeLock) {
+			// IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as
+			// working on the disposed object results in SEGFAULTS. Other code has to check field #db for null
+			// and access it in a synchronized block that locks on #dbDisposeLock.
+			cleanupRockDBReference = db;
+			db = null;
+		}
+
+		// Dispose decoupled db
+		if (cleanupRockDBReference != null) {
+			for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
+				column.f0.dispose();
 			}
+			cleanupRockDBReference.dispose();
 		}
 
 		FileUtils.deleteDirectory(instanceBasePath);
 	}
 
+	public int getKeyGroupPrefixBytes() {
+		return keyGroupPrefixBytes;
+	}
+
+	/**
+	 * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
+	 * is also stopped when the backend is closed through {@link #close()}. For each backend, this method must always
+	 * be called by the same thread.
+	 *
+	 * @param checkpointId The Id of the checkpoint.
+	 * @param timestamp The timestamp of the checkpoint.
+	 * @param streamFactory The factory that we can use for writing our state to streams.
+	 *
+	 * @return Future to the state handle of the snapshot data.
+	 * @throws Exception
+	 */
 	@Override
-	public Future<KeyGroupsStateHandle> snapshot(
-			long checkpointId,
-			long timestamp,
-			CheckpointStreamFactory streamFactory) throws Exception {
-		throw new RuntimeException("Not implemented.");
+	public RunnableFuture<KeyGroupsStateHandle> snapshot(
+			final long checkpointId,
+			final long timestamp,
+			final CheckpointStreamFactory streamFactory) throws Exception {
+
+		long startTime = System.currentTimeMillis();
+
+		if (kvStateInformation.isEmpty()) {
+			LOG.info("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + " . Returning null.");
+			return new DoneFuture<>(null);
+		}
+
+		final RocksDBSnapshotOperation snapshotOperation = new RocksDBSnapshotOperation(this, streamFactory);
+		// hold the db lock while operation on the db to guard us against async db disposal
+		synchronized (dbDisposeLock) {
+			if (db != null) {
+				snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
+			} else {
+				throw new IOException("RocksDB closed.");
+			}
+		}
+
+		// implementation of the async IO operation, based on FutureTask
+		AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
+				new AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
+
+					@Override
+					public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
+						snapshotOperation.openCheckpointStream();
+						return snapshotOperation.getOutStream();
+					}
+
+					@Override
+					public KeyGroupsStateHandle performOperation() throws Exception {
+						long startTime = System.currentTimeMillis();
+						try {
+							// hold the db lock while operation on the db to guard us against async db disposal
+							synchronized (dbDisposeLock) {
+								if (db != null) {
+									snapshotOperation.writeDBSnapshot();
+								} else {
+									throw new IOException("RocksDB closed.");
+								}
+							}
+
+						} finally {
+							snapshotOperation.closeCheckpointStream();
+						}
+
+						LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", asynchronous part) in thread " +
+								Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms.");
+
+						return snapshotOperation.getSnapshotResultStateHandle();
+					}
+
+					@Override
+					public void done() {
+						// hold the db lock while operation on the db to guard us against async db disposal
+						synchronized (dbDisposeLock) {
+							if (db != null) {
+								snapshotOperation.releaseDBSnapshot();
+							}
+						}
+					}
+				};
+
+		LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " +
+				Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms.");
+
+		return AsyncStoppableTaskWithCallback.from(ioCallable);
+	}
+
+	/**
+	 * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend.
+	 */
+	static final class RocksDBSnapshotOperation {
+
+		static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
+		static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
+
+		private final RocksDBKeyedStateBackend<?> stateBackend;
+		private final KeyGroupRangeOffsets keyGroupRangeOffsets;
+		private final CheckpointStreamFactory checkpointStreamFactory;
+
+		private long checkpointId;
+		private long checkpointTimeStamp;
+
+		private Snapshot snapshot;
+		private CheckpointStreamFactory.CheckpointStateOutputStream outStream;
+		private DataOutputView outputView;
+		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
+		private KeyGroupsStateHandle snapshotResultStateHandle;
+
+
+
+		public RocksDBSnapshotOperation(
+				RocksDBKeyedStateBackend<?> stateBackend,
+				CheckpointStreamFactory checkpointStreamFactory) {
+
+			this.stateBackend = stateBackend;
+			this.checkpointStreamFactory = checkpointStreamFactory;
+			this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange);
+		}
+
+		/**
+		 * 1) Create a snapshot object from RocksDB.
+		 *
+		 * @param checkpointId id of the checkpoint for which we take the snapshot
+		 * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot
+		 */
+		public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) throws IOException {
+			Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
+			this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size());
+			this.checkpointId = checkpointId;
+			this.checkpointTimeStamp = checkpointTimeStamp;
+			this.snapshot = stateBackend.db.getSnapshot();
+		}
+
+		/**
+		 * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write.
+		 *
+		 * @throws Exception
+		 */
+		public void openCheckpointStream() throws Exception {
+			Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set.");
+			outStream = checkpointStreamFactory.
+					createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
+			outputView = new DataOutputViewStreamWrapper(outStream);
+		}
+
+		/**
+		 * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1).
+		 *
+		 * @return
+		 * @throws IOException
+		 */
+		public void writeDBSnapshot() throws IOException, InterruptedException {
+			Preconditions.checkNotNull(snapshot, "No ongoing snapshot to write.");
+			Preconditions.checkNotNull(outStream, "No output stream to write snapshot.");
+			writeKVStateMetaData();
+			writeKVStateData();
+		}
+
+		/**
+		 * 4) Close the CheckpointStateOutputStream after writing and receive a state handle.
+		 *
+		 * @throws IOException
+		 */
+		public void closeCheckpointStream() throws IOException {
+			if(outStream != null) {
+				snapshotResultStateHandle = closeSnapshotStreamAndGetHandle();
+			}
+		}
+
+		/**
+		 * 5) Release the snapshot object for RocksDB and clean up.
+		 *
+		 */
+		public void releaseDBSnapshot() {
+			Preconditions.checkNotNull(snapshot, "No ongoing snapshot to release.");
+			stateBackend.db.releaseSnapshot(snapshot);
+			snapshot = null;
+			outStream = null;
+			outputView = null;
+			kvStateIterators = null;
+		}
+
+		/**
+		 * Returns the current CheckpointStateOutputStream (when it was opened and not yet closed) into which we write
+		 * the state snapshot.
+		 *
+		 * @return the current CheckpointStateOutputStream
+		 */
+		public CheckpointStreamFactory.CheckpointStateOutputStream getOutStream() {
+			return outStream;
+		}
+
+		/**
+		 * Returns a state handle to the snapshot after the snapshot procedure is completed and null before.
+		 *
+		 * @return state handle to the completed snapshot
+		 */
+		public KeyGroupsStateHandle getSnapshotResultStateHandle() {
+			return snapshotResultStateHandle;
+		}
+
+		private void writeKVStateMetaData() throws IOException, InterruptedException {
+			//write number of k/v states
+			outputView.writeInt(stateBackend.kvStateInformation.size());
+
+			int kvStateId = 0;
+			//iterate all column families, where each column family holds one k/v state, to write the metadata
+			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : stateBackend.kvStateInformation.entrySet()) {
+
+				//be cooperative and check for interruption from time to time in the hot loop
+				checkInterrupted();
+
+				//write StateDescriptor for this k/v state
+				ObjectOutputStream ooOut = new ObjectOutputStream(outStream);
+				ooOut.writeObject(column.getValue().f1);
+				//retrieve iterator for this k/v states
+				ReadOptions readOptions = new ReadOptions();
+				readOptions.setSnapshot(snapshot);
+				RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions);
+				kvStateIterators.add(new Tuple2<RocksIterator, Integer>(iterator, kvStateId));
+				++kvStateId;
+			}
+		}
+
+		private void writeKVStateData() throws IOException, InterruptedException {
+
+			RocksDBMergeIterator iterator = new RocksDBMergeIterator(kvStateIterators, stateBackend.keyGroupPrefixBytes);
+
+			byte[] previousKey = null;
+			byte[] previousValue = null;
+
+			//preamble: setup with first key-group as our lookahead
+			if (iterator.isValid()) {
+				//begin first key-group by recording the offset
+				keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos());
+				//write the k/v-state id as metadata
+				//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+				outputView.writeShort(iterator.kvStateId());
+				previousKey = iterator.key();
+				previousValue = iterator.value();
+				iterator.next();
+			}
+
+			//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
+			while (iterator.isValid()) {
+
+				assert (!hasMetaDataFollowsFlag(previousKey));
+
+				//set signal in first key byte that meta data will follow in the stream after this k/v pair
+				if (iterator.isNewKeyGroup() || iterator.isNewKeyValueState()) {
+
+					//be cooperative and check for interruption from time to time in the hot loop
+					checkInterrupted();
+
+					setMetaDataFollowsFlagInKey(previousKey);
+				}
+
+				writeKeyValuePair(previousKey, previousValue);
+
+				//write meta data if we have to
+				if (iterator.isNewKeyGroup()) {
+					//
+					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+					outputView.writeShort(END_OF_KEY_GROUP_MARK);
+					//begin new key-group
+					keyGroupRangeOffsets.setKeyGroupOffset(iterator.keyGroup(), outStream.getPos());
+					//write the kev-state
+					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+					outputView.writeShort(iterator.kvStateId());
+				} else if (iterator.isNewKeyValueState()) {
+					//write the k/v-state
+					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+					outputView.writeShort(iterator.kvStateId());
+				}
+
+				//request next k/v pair
+				previousKey = iterator.key();
+				previousValue = iterator.value();
+				iterator.next();
+			}
+
+			//epilogue: write last key-group
+			if (previousKey != null) {
+				assert (!hasMetaDataFollowsFlag(previousKey));
+				setMetaDataFollowsFlagInKey(previousKey);
+				writeKeyValuePair(previousKey, previousValue);
+				//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+				outputView.writeShort(END_OF_KEY_GROUP_MARK);
+			}
+		}
+
+		private KeyGroupsStateHandle closeSnapshotStreamAndGetHandle() throws IOException {
+			StreamStateHandle stateHandle = outStream.closeAndGetHandle();
+			outStream = null;
+			if (stateHandle != null) {
+				return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+			} else {
+				throw new IOException("Output stream returned null on close.");
+			}
+		}
+
+		private void writeKeyValuePair(byte[] key, byte[] value) throws IOException {
+			BytePrimitiveArraySerializer.INSTANCE.serialize(key, outputView);
+			BytePrimitiveArraySerializer.INSTANCE.serialize(value, outputView);
+		}
+
+		static void setMetaDataFollowsFlagInKey(byte[] key) {
+			key[0] |= FIRST_BIT_IN_BYTE_MASK;
+		}
+
+		static void clearMetaDataFollowsFlag(byte[] key) {
+			key[0] &= (~RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
+		}
+
+		static boolean hasMetaDataFollowsFlag(byte[] key) {
+			return 0 != (key[0] & RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
+		}
+
+		private static void checkInterrupted() throws InterruptedException {
+			if(Thread.currentThread().isInterrupted()) {
+				throw new InterruptedException("Snapshot canceled.");
+			}
+		}
+	}
+
+	/**
+	 * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot.
+	 */
+	static final class RocksDBRestoreOperation {
+
+		private final RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend;
+
+		/** Current key-groups state handle from which we restore key-groups */
+		private KeyGroupsStateHandle currentKeyGroupsStateHandle;
+		/** Current input stream we obtained from currentKeyGroupsStateHandle */
+		private FSDataInputStream currentStateHandleInStream;
+		/** Current data input view that wraps currentStateHandleInStream */
+		private DataInputView currentStateHandleInView;
+		/** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle */
+		private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
+
+		/**
+		 * Creates a restore operation object for the given state backend instance.
+		 *
+		 * @param rocksDBKeyedStateBackend the state backend into which we restore
+		 */
+		public RocksDBRestoreOperation(RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend) {
+			this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend);
+		}
+
+		/**
+		 * Restores all key-groups data that is referenced by the passed state handles.
+		 *
+		 * @param keyGroupsStateHandles List of all key groups state handles that shall be restored.
+		 * @throws IOException
+		 * @throws ClassNotFoundException
+		 * @throws RocksDBException
+		 */
+		public void doRestore(List<KeyGroupsStateHandle> keyGroupsStateHandles)
+				throws IOException, ClassNotFoundException, RocksDBException {
+
+			for (KeyGroupsStateHandle keyGroupsStateHandle : keyGroupsStateHandles) {
+				if (keyGroupsStateHandle != null) {
+					this.currentKeyGroupsStateHandle = keyGroupsStateHandle;
+					restoreKeyGroupsInStateHandle();
+				}
+			}
+		}
+
+		/**
+		 * Restore one key groups state handle
+		 *
+		 * @throws IOException
+		 * @throws RocksDBException
+		 * @throws ClassNotFoundException
+		 */
+		private void restoreKeyGroupsInStateHandle()
+				throws IOException, RocksDBException, ClassNotFoundException {
+			try {
+				currentStateHandleInStream = currentKeyGroupsStateHandle.getStateHandle().openInputStream();
+				currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
+				restoreKVStateMetaData();
+				restoreKVStateData();
+			} finally {
+				if(currentStateHandleInStream != null) {
+					currentStateHandleInStream.close();
+				}
+			}
+
+		}
+
+		/**
+		 * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle
+		 *
+		 * @throws IOException
+		 * @throws ClassNotFoundException
+		 * @throws RocksDBException
+		 */
+		private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {
+			//read number of k/v states
+			int numColumns = currentStateHandleInView.readInt();
+
+			//those two lists are aligned and should later have the same size!
+			currentStateHandleKVStateColumnFamilies = new ArrayList<>(numColumns);
+
+			//restore the empty columns for the k/v states through the metadata
+			for (int i = 0; i < numColumns; i++) {
+				ObjectInputStream ooIn = new ObjectInputStream(currentStateHandleInStream);
+				StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
+				Tuple2<ColumnFamilyHandle, StateDescriptor> columnFamily = rocksDBKeyedStateBackend.
+						kvStateInformation.get(stateDescriptor.getName());
+
+				if(null == columnFamily) {
+					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
+							stateDescriptor.getName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+
+					columnFamily = new Tuple2<>(rocksDBKeyedStateBackend.db.
+							createColumnFamily(columnFamilyDescriptor), stateDescriptor);
+					rocksDBKeyedStateBackend.kvStateInformation.put(stateDescriptor.getName(), columnFamily);
+				}
+
+				currentStateHandleKVStateColumnFamilies.add(columnFamily.f0);
+			}
+		}
+
+		/**
+		 * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle
+		 *
+		 * @throws IOException
+		 * @throws RocksDBException
+		 */
+		private void restoreKVStateData() throws IOException, RocksDBException {
+			//for all key-groups in the current state handle...
+			for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
+				long offset = keyGroupOffset.f1;
+				//not empty key-group?
+				if (0L != offset) {
+					currentStateHandleInStream.seek(offset);
+					boolean keyGroupHasMoreKeys = true;
+					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+					int kvStateId = currentStateHandleInView.readShort();
+					ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+					//insert all k/v pairs into DB
+					while (keyGroupHasMoreKeys) {
+						byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
+						byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
+						if (RocksDBSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+							//clear the signal bit in the key to make it ready for insertion again
+							RocksDBSnapshotOperation.clearMetaDataFollowsFlag(key);
+							rocksDBKeyedStateBackend.db.put(handle, key, value);
+							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
+							kvStateId = RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK
+									& currentStateHandleInView.readShort();
+							if (RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+								keyGroupHasMoreKeys = false;
+							} else {
+								handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
+							}
+						} else {
+							rocksDBKeyedStateBackend.db.put(handle, key, value);
+						}
+					}
+				}
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -197,12 +729,14 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 
 		if (stateInfo != null) {
 			if (!stateInfo.f1.equals(descriptor)) {
-				throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 + " trying access with " + descriptor);
+				throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 +
+						" trying access with " + descriptor);
 			}
 			return stateInfo.f0;
 		}
 
-		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(descriptor.getName().getBytes(), columnOptions);
+		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(
+				descriptor.getName().getBytes(), columnOptions);
 
 		try {
 			ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
@@ -248,4 +782,206 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
 
 		return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
 	}
+
+	/**
+	 * Wraps a RocksDB iterator to cache it's current key and assign an id for the key/value state to the iterator.
+	 * Used by #MergeIterator.
+	 */
+	static final class MergeIterator {
+
+		/**
+		 *
+		 * @param iterator The #RocksIterator to wrap .
+		 * @param kvStateId Id of the K/V state to which this iterator belongs.
+		 */
+		public MergeIterator(RocksIterator iterator, int kvStateId) {
+			this.iterator = Preconditions.checkNotNull(iterator);
+			this.currentKey = iterator.key();
+			this.kvStateId = kvStateId;
+		}
+
+		private byte[] currentKey;
+		private final RocksIterator iterator;
+		private final int kvStateId;
+
+		public byte[] getCurrentKey() {
+			return currentKey;
+		}
+
+		public void setCurrentKey(byte[] currentKey) {
+			this.currentKey = currentKey;
+		}
+
+		public RocksIterator getIterator() {
+			return iterator;
+		}
+
+		public int getKvStateId() {
+			return kvStateId;
+		}
+	}
+
+	/**
+	 * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
+	 * The resulting iteration sequence is ordered by (key-group, kv-state).
+	 */
+	static final class RocksDBMergeIterator {
+
+		private final PriorityQueue<MergeIterator> heap;
+		private final int keyGroupPrefixByteCount;
+		private boolean newKeyGroup;
+		private boolean newKVState;
+		private boolean valid;
+
+		private MergeIterator currentSubIterator;
+
+		RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) throws IOException {
+			Preconditions.checkNotNull(kvStateIterators);
+			this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+			Comparator<MergeIterator> iteratorComparator = new Comparator<MergeIterator>() {
+				@Override
+				public int compare(MergeIterator o1, MergeIterator o2) {
+					int arrayCmpRes = compareKeyGroupsForByteArrays(
+							o1.currentKey, o2.currentKey, keyGroupPrefixByteCount);
+					return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
+				}
+			};
+
+			if (kvStateIterators.size() > 0) {
+				this.heap = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+
+				for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
+					RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
+					rocksIterator.seekToFirst();
+					if (rocksIterator.isValid()) {
+						heap.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+					}
+				}
+				this.valid = !heap.isEmpty();
+				this.currentSubIterator = heap.poll();
+			} else {
+				// creating a PriorityQueue of size 0 results in an exception.
+				this.heap = null;
+				this.valid = false;
+			}
+
+			this.newKeyGroup = true;
+			this.newKVState = true;
+		}
+
+		/**
+		 * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after
+		 * calls to {@link #next()}.
+		 */
+		public void next() {
+			newKeyGroup = false;
+			newKVState = false;
+
+			final RocksIterator rocksIterator = currentSubIterator.getIterator();
+			rocksIterator.next();
+
+			byte[] oldKey = currentSubIterator.getCurrentKey();
+			if (rocksIterator.isValid()) {
+				currentSubIterator.currentKey = rocksIterator.key();
+
+				if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
+					heap.offer(currentSubIterator);
+					currentSubIterator = heap.poll();
+					newKVState = currentSubIterator.getIterator() != rocksIterator;
+					detectNewKeyGroup(oldKey);
+				}
+			} else if (heap.isEmpty()) {
+				valid = false;
+			} else {
+				currentSubIterator = heap.poll();
+				newKVState = true;
+				detectNewKeyGroup(oldKey);
+			}
+
+		}
+
+		private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
+			return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount);
+		}
+
+		private void detectNewKeyGroup(byte[] oldKey) {
+			if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) {
+				newKeyGroup = true;
+			}
+		}
+
+		/**
+		 * Returns the key-group for the current key.
+		 * @return key-group for the current key
+		 */
+		public int keyGroup() {
+			int result = 0;
+			//big endian decode
+			for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
+				result <<= 8;
+				result |= (currentSubIterator.currentKey[i] & 0xFF);
+			}
+			return result;
+		}
+
+		public byte[] key() {
+			return currentSubIterator.getCurrentKey();
+		}
+
+		public byte[] value() {
+			return currentSubIterator.getIterator().value();
+		}
+
+		/**
+		 * Returns the Id of the k/v state to which the current key belongs.
+		 * @return Id of K/V state to which the current key belongs.
+		 */
+		public int kvStateId() {
+			return currentSubIterator.getKvStateId();
+		}
+
+		/**
+		 * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor.
+		 * @return true iff the current key belong to a different k/v-state than it's predecessor.
+		 */
+		public boolean isNewKeyValueState() {
+			return newKVState;
+		}
+
+		/**
+		 * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor.
+		 * @return true iff the current key belong to a different key-group than it's predecessor.
+		 */
+		public boolean isNewKeyGroup() {
+			return newKeyGroup;
+		}
+
+		/**
+		 * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as
+		 * {@link #next()} should only be called if valid returned true. Should be checked after each call to
+		 * {@link #next()} before accessing iterator state.
+		 * @return True iff this iterator is valid.
+		 */
+		public boolean isValid() {
+			return valid;
+		}
+
+		private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
+			for (int i = 0; i < len; ++i) {
+				int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
+				if (diff != 0) {
+					return diff;
+				}
+			}
+			return 0;
+		}
+	}
+
+	/**
+	 * Only visible for testing, DO NOT USE.
+	 */
+	public File getInstanceBasePath() {
+		return instanceBasePath;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index d8f937b..beea81a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -28,7 +28,6 @@ import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -67,7 +66,7 @@ public class RocksDBListState<K, N, V>
 	public RocksDBListState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<V> stateDesc,
-			RocksDBKeyedStateBackend backend) {
+			RocksDBKeyedStateBackend<K> backend) {
 
 		super(columnFamily, namespaceSerializer, stateDesc, backend);
 		this.valueSerializer = stateDesc.getSerializer();
@@ -78,11 +77,9 @@ public class RocksDBListState<K, N, V>
 
 	@Override
 	public Iterable<V> get() {
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
 		try {
-			writeKeyAndNamespace(out);
-			byte[] key = baos.toByteArray();
+			writeCurrentKeyWithGroupAndNamespace();
+			byte[] key = keySerializationStream.toByteArray();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 
 			if (valueBytes == null) {
@@ -107,16 +104,13 @@ public class RocksDBListState<K, N, V>
 
 	@Override
 	public void add(V value) throws IOException {
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
 		try {
-			writeKeyAndNamespace(out);
-			byte[] key = baos.toByteArray();
-
-			baos.reset();
-
+			writeCurrentKeyWithGroupAndNamespace();
+			byte[] key = keySerializationStream.toByteArray();
+			keySerializationStream.reset();
+			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
 			valueSerializer.serialize(value, out);
-			backend.db.merge(columnFamily, writeOptions, key, baos.toByteArray());
+			backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
 
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 15ae493..068c051 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -29,7 +29,6 @@ import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 /**
@@ -65,7 +64,7 @@ public class RocksDBReducingState<K, N, V>
 	public RocksDBReducingState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			ReducingStateDescriptor<V> stateDesc,
-			RocksDBKeyedStateBackend backend) {
+			RocksDBKeyedStateBackend<K> backend) {
 
 		super(columnFamily, namespaceSerializer, stateDesc, backend);
 		this.valueSerializer = stateDesc.getSerializer();
@@ -77,11 +76,9 @@ public class RocksDBReducingState<K, N, V>
 
 	@Override
 	public V get() {
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
 		try {
-			writeKeyAndNamespace(out);
-			byte[] key = baos.toByteArray();
+			writeCurrentKeyWithGroupAndNamespace();
+			byte[] key = keySerializationStream.toByteArray();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 			if (valueBytes == null) {
 				return null;
@@ -94,23 +91,22 @@ public class RocksDBReducingState<K, N, V>
 
 	@Override
 	public void add(V value) throws IOException {
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
 		try {
-			writeKeyAndNamespace(out);
-			byte[] key = baos.toByteArray();
+			writeCurrentKeyWithGroupAndNamespace();
+			byte[] key = keySerializationStream.toByteArray();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 
+			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
 			if (valueBytes == null) {
-				baos.reset();
+				keySerializationStream.reset();
 				valueSerializer.serialize(value, out);
-				backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
+				backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
 			} else {
 				V oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
 				V newValue = reduceFunction.reduce(oldValue, value);
-				baos.reset();
+				keySerializationStream.reset();
 				valueSerializer.serialize(newValue, out);
-				backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
+				backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
 			}
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 62b71d9..f950751 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -221,12 +221,6 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	@Override
 	public CheckpointStreamFactory createStreamFactory(JobID jobId,
 			String operatorIdentifier) throws IOException {
-			return null;
-		}
-
-		if (fullyAsyncBackup) {
-			return performFullyAsyncSnapshot(checkpointId, timestamp);
-		} else {
 		return checkpointStreamBackend.createStreamFactory(jobId, operatorIdentifier);
 	}
 
@@ -261,10 +255,24 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			String operatorIdentifier,
 			TypeSerializer<K> keySerializer,
 			KeyGroupAssigner<K> keyGroupAssigner,
-            KeyGroupRange keyGroupRange,
+			KeyGroupRange keyGroupRange,
 			List<KeyGroupsStateHandle> restoredState,
 			TaskKvStateRegistry kvStateRegistry) throws Exception {
-		throw new RuntimeException("Not implemented.");
+
+		lazyInitializeForJob(env, operatorIdentifier);
+
+		File instanceBasePath = new File(getDbPath(), UUID.randomUUID().toString());
+		return new RocksDBKeyedStateBackend<>(
+				jobID,
+				operatorIdentifier,
+				instanceBasePath,
+				getDbOptions(),
+				getColumnOptions(),
+				kvStateRegistry,
+				keySerializer,
+				keyGroupAssigner,
+				keyGroupRange,
+				restoredState);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index b9c0e83..9563ed8 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -24,13 +24,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.util.Preconditions;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 /**
@@ -63,7 +61,7 @@ public class RocksDBValueState<K, N, V>
 	public RocksDBValueState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			ValueStateDescriptor<V> stateDesc,
-			RocksDBKeyedStateBackend backend) {
+			RocksDBKeyedStateBackend<K> backend) {
 
 		super(columnFamily, namespaceSerializer, stateDesc, backend);
 		this.valueSerializer = stateDesc.getSerializer();
@@ -74,11 +72,9 @@ public class RocksDBValueState<K, N, V>
 
 	@Override
 	public V value() {
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
 		try {
-			writeKeyAndNamespace(out);
-			byte[] key = baos.toByteArray();
+			writeCurrentKeyWithGroupAndNamespace();
+			byte[] key = keySerializationStream.toByteArray();
 			byte[] valueBytes = backend.db.get(columnFamily, key);
 			if (valueBytes == null) {
 				return stateDesc.getDefaultValue();
@@ -95,14 +91,13 @@ public class RocksDBValueState<K, N, V>
 			clear();
 			return;
 		}
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
 		try {
-			writeKeyAndNamespace(out);
-			byte[] key = baos.toByteArray();
-			baos.reset();
+			writeCurrentKeyWithGroupAndNamespace();
+			byte[] key = keySerializationStream.toByteArray();
+			keySerializationStream.reset();
 			valueSerializer.serialize(value, out);
-			backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
+			backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);
 		}
@@ -110,11 +105,7 @@ public class RocksDBValueState<K, N, V>
 
 	@Override
 	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
-		// Serialized key and namespace is expected to be of the same format
-		// as writeKeyAndNamespace()
-		Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
-
-		byte[] value = backend.db.get(columnFamily, serializedKeyAndNamespace);
+		byte[] value = super.getSerializedValue(serializedKeyAndNamespace);
 
 		if (value != null) {
 			return value;

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
deleted file mode 100644
index 0e35b60..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.List;
-import java.util.UUID;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for asynchronous RocksDB Key/Value state checkpoints.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class, FileSystem.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-@SuppressWarnings("serial")
-public class RocksDBAsyncKVSnapshotTest {
-
-	@Before
-	public void checkOperatingSystem() {
-		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
-	}
-
-	/**
-	 * This ensures that asynchronous state handles are actually materialized asynchonously.
-	 *
-	 * <p>We use latches to block at various stages and see if the code still continues through
-	 * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
-	 * test will simply lock forever.
-	 */
-	@Test
-	public void testAsyncCheckpoints() throws Exception {
-		LocalFileSystem localFS = new LocalFileSystem();
-		localFS.initialize(new URI("file:///"), new Configuration());
-		PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", URI.class, Configuration.class)).toReturn(localFS);
-
-		final OneShotLatch delayCheckpointLatch = new OneShotLatch();
-		final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
-
-		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
-
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.configureForKeyedStream(new KeySelector<String, String>() {
-			@Override
-			public String getKey(String value) throws Exception {
-				return value;
-			}
-		}, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-
-		File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
-		File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
-
-		RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
-		backend.setDbStoragePath(dbDir.getAbsolutePath());
-
-		streamConfig.setStateBackend(backend);
-
-		streamConfig.setStreamOperator(new AsyncCheckpointOperator());
-
-		StreamMockEnvironment mockEnv = new StreamMockEnvironment(
-			testHarness.jobConfig,
-			testHarness.taskConfig,
-			testHarness.memorySize,
-			new MockInputSplitProvider(),
-			testHarness.bufferSize) {
-
-			@Override
-			public void acknowledgeCheckpoint(long checkpointId) {
-				super.acknowledgeCheckpoint(checkpointId);
-			}
-
-			@Override
-			public void acknowledgeCheckpoint(long checkpointId,
-					ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-					List<KeyGroupsStateHandle> keyGroupStateHandles) {
-				super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles);
-
-				// block on the latch, to verify that triggerCheckpoint returns below,
-				// even though the async checkpoint would not finish
-				try {
-					delayCheckpointLatch.await();
-				} catch (InterruptedException e) {
-					e.printStackTrace();
-				}
-
-
-				// should be only one k/v state
-
-				assertEquals(1, keyGroupStateHandles.size());
-
-				// we now know that the checkpoint went through
-				ensureCheckpointLatch.trigger();
-			}
-		};
-
-		testHarness.invoke(mockEnv);
-
-		// wait for the task to be running
-		for (Field field: StreamTask.class.getDeclaredFields()) {
-			if (field.getName().equals("isRunning")) {
-				field.setAccessible(true);
-				while (!field.getBoolean(task)) {
-					Thread.sleep(10);
-				}
-
-			}
-		}
-
-		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
-
-		task.triggerCheckpoint(42, 17);
-
-		// now we allow the checkpoint
-		delayCheckpointLatch.trigger();
-
-		// wait for the checkpoint to go through
-		ensureCheckpointLatch.await();
-
-		testHarness.endInput();
-		testHarness.waitForTaskCompletion();
-	}
-
-	/**
-	 * This ensures that asynchronous state handles are actually materialized asynchonously.
-	 *
-	 * <p>We use latches to block at various stages and see if the code still continues through
-	 * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
-	 * test will simply lock forever.
-	 */
-	@Test
-	public void testFullyAsyncCheckpoints() throws Exception {
-		LocalFileSystem localFS = new LocalFileSystem();
-		localFS.initialize(new URI("file:///"), new Configuration());
-		PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", URI.class, Configuration.class)).toReturn(localFS);
-
-		final OneShotLatch delayCheckpointLatch = new OneShotLatch();
-		final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
-
-		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
-
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.configureForKeyedStream(new KeySelector<String, String>() {
-			@Override
-			public String getKey(String value) throws Exception {
-				return value;
-			}
-		}, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-
-		File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
-		File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
-
-		RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
-		backend.setDbStoragePath(dbDir.getAbsolutePath());
-//		backend.enableFullyAsyncSnapshots();
-
-		streamConfig.setStateBackend(backend);
-
-		streamConfig.setStreamOperator(new AsyncCheckpointOperator());
-
-		StreamMockEnvironment mockEnv = new StreamMockEnvironment(
-				testHarness.jobConfig,
-				testHarness.taskConfig,
-				testHarness.memorySize,
-				new MockInputSplitProvider(),
-				testHarness.bufferSize) {
-
-			@Override
-			public void acknowledgeCheckpoint(long checkpointId) {
-				super.acknowledgeCheckpoint(checkpointId);
-			}
-
-			@Override
-			public void acknowledgeCheckpoint(long checkpointId,
-					ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-					List<KeyGroupsStateHandle> keyGroupStateHandles) {
-				super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles);
-
-				// block on the latch, to verify that triggerCheckpoint returns below,
-				// even though the async checkpoint would not finish
-				try {
-					delayCheckpointLatch.await();
-				} catch (InterruptedException e) {
-					e.printStackTrace();
-				}
-
-				// should be only one k/v state
-				assertEquals(1, keyGroupStateHandles.size());
-
-				// we now know that the checkpoint went through
-				ensureCheckpointLatch.trigger();
-			}
-		};
-
-		testHarness.invoke(mockEnv);
-
-		// wait for the task to be running
-		for (Field field: StreamTask.class.getDeclaredFields()) {
-			if (field.getName().equals("isRunning")) {
-				field.setAccessible(true);
-				while (!field.getBoolean(task)) {
-					Thread.sleep(10);
-				}
-
-			}
-		}
-
-		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
-
-		task.triggerCheckpoint(42, 17);
-
-		// now we allow the checkpoint
-		delayCheckpointLatch.trigger();
-
-		// wait for the checkpoint to go through
-		ensureCheckpointLatch.await();
-
-		testHarness.endInput();
-		testHarness.waitForTaskCompletion();
-	}
-
-
-	// ------------------------------------------------------------------------
-
-	public static class AsyncCheckpointOperator
-		extends AbstractStreamOperator<String>
-		implements OneInputStreamOperator<String, String> {
-
-		@Override
-		public void open() throws Exception {
-			super.open();
-
-			// also get the state in open, this way we are sure that it was created before
-			// we trigger the test checkpoint
-			ValueState<String> state = getPartitionedState(
-					VoidNamespace.INSTANCE,
-					VoidNamespaceSerializer.INSTANCE,
-					new ValueStateDescriptor<>("count",
-							StringSerializer.INSTANCE, "hello"));
-
-		}
-
-		@Override
-		public void processElement(StreamRecord<String> element) throws Exception {
-			// we also don't care
-
-			ValueState<String> state = getPartitionedState(
-					VoidNamespace.INSTANCE,
-					VoidNamespaceSerializer.INSTANCE,
-					new ValueStateDescriptor<>("count",
-							StringSerializer.INSTANCE, "hello"));
-
-			state.update(element.getValue());
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-			// not interested
-		}
-	}
-
-	public static class DummyMapFunction<T> implements MapFunction<T, T> {
-		@Override
-		public T map(T value) { return value; }
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
new file mode 100644
index 0000000..624905c
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for asynchronous RocksDB Key/Value state checkpoints.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class, FileSystem.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+@SuppressWarnings("serial")
+public class RocksDBAsyncSnapshotTest {
+
+	@Before
+	public void checkOperatingSystem() {
+		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+	}
+
+	/**
+	 * This ensures that asynchronous state handles are actually materialized asynchonously.
+	 *
+	 * <p>We use latches to block at various stages and see if the code still continues through
+	 * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
+	 * test will simply lock forever.
+	 */
+	@Test
+	public void testFullyAsyncSnapshot() throws Exception {
+
+		LocalFileSystem localFS = new LocalFileSystem();
+		localFS.initialize(new URI("file:///"), new Configuration());
+		PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", URI.class, Configuration.class)).toReturn(localFS);
+
+		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
+
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.configureForKeyedStream(new KeySelector<String, String>() {
+			@Override
+			public String getKey(String value) throws Exception {
+				return value;
+			}
+		}, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+
+		File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
+		File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
+
+		RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
+		backend.setDbStoragePath(dbDir.getAbsolutePath());
+
+		streamConfig.setStateBackend(backend);
+
+		streamConfig.setStreamOperator(new AsyncCheckpointOperator());
+
+		final OneShotLatch delayCheckpointLatch = new OneShotLatch();
+		final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
+
+		StreamMockEnvironment mockEnv = new StreamMockEnvironment(
+				testHarness.jobConfig,
+				testHarness.taskConfig,
+				testHarness.memorySize,
+				new MockInputSplitProvider(),
+				testHarness.bufferSize) {
+
+			@Override
+			public void acknowledgeCheckpoint(long checkpointId) {
+				super.acknowledgeCheckpoint(checkpointId);
+			}
+
+			@Override
+			public void acknowledgeCheckpoint(long checkpointId,
+			                                  ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+			                                  List<KeyGroupsStateHandle> keyGroupStateHandles) {
+				super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles);
+
+				// block on the latch, to verify that triggerCheckpoint returns below,
+				// even though the async checkpoint would not finish
+				try {
+					delayCheckpointLatch.await();
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+
+				// should be only one k/v state
+				assertEquals(1, keyGroupStateHandles.size());
+
+				// we now know that the checkpoint went through
+				ensureCheckpointLatch.trigger();
+			}
+		};
+
+		testHarness.invoke(mockEnv);
+
+		// wait for the task to be running
+		for (Field field: StreamTask.class.getDeclaredFields()) {
+			if (field.getName().equals("isRunning")) {
+				field.setAccessible(true);
+				while (!field.getBoolean(task)) {
+					Thread.sleep(10);
+				}
+
+			}
+		}
+
+		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
+
+		task.triggerCheckpoint(42, 17);
+
+		// now we allow the checkpoint
+		delayCheckpointLatch.trigger();
+
+		// wait for the checkpoint to go through
+		ensureCheckpointLatch.await();
+
+		testHarness.endInput();
+		testHarness.waitForTaskCompletion();
+	}
+
+	/**
+	 * This tests ensures that canceling of asynchronous snapshots works as expected and does not block.
+	 * @throws Exception
+	 */
+	@Test
+	public void testCancelFullyAsyncCheckpoints() throws Exception {
+		LocalFileSystem localFS = new LocalFileSystem();
+		localFS.initialize(new URI("file:///"), new Configuration());
+		PowerMockito.stub(PowerMockito.method(FileSystem.class, "get", URI.class, Configuration.class)).toReturn(localFS);
+
+		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
+
+		//ensure that the async threads complete before invoke method of the tasks returns.
+		task.setThreadPoolTerminationTimeout(Long.MAX_VALUE);
+
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.configureForKeyedStream(new KeySelector<String, String>() {
+			@Override
+			public String getKey(String value) throws Exception {
+				return value;
+			}
+		}, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+
+		File dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
+		File chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
+
+		BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend();
+
+		RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), memoryStateBackend);
+		backend.setDbStoragePath(dbDir.getAbsolutePath());
+
+		streamConfig.setStateBackend(backend);
+
+		streamConfig.setStreamOperator(new AsyncCheckpointOperator());
+
+		StreamMockEnvironment mockEnv = new StreamMockEnvironment(
+				testHarness.jobConfig,
+				testHarness.taskConfig,
+				testHarness.memorySize,
+				new MockInputSplitProvider(),
+				testHarness.bufferSize);
+
+		testHarness.invoke(mockEnv);
+
+		// wait for the task to be running
+		for (Field field: StreamTask.class.getDeclaredFields()) {
+			if (field.getName().equals("isRunning")) {
+				field.setAccessible(true);
+				while (!field.getBoolean(task)) {
+					Thread.sleep(10);
+				}
+
+			}
+		}
+
+		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
+
+		task.triggerCheckpoint(42, 17);
+
+		BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
+		task.cancel();
+
+		BlockingStreamMemoryStateBackend.unblockCancelLatch.trigger();
+
+		testHarness.endInput();
+		try {
+			testHarness.waitForTaskCompletion();
+			Assert.fail("Operation completed. Cancel failed.");
+		} catch (Exception expected) {
+			// we expect the exception from canceling snapshots
+			Throwable cause = expected.getCause();
+			if(cause instanceof AsynchronousException) {
+				AsynchronousException asynchronousException = (AsynchronousException) cause;
+				cause = asynchronousException.getCause();
+				Assert.assertTrue("Unexpected Exception: " + cause,
+						cause instanceof CancellationException //future canceled
+						|| cause instanceof InterruptedException); //thread interrupted
+
+			} else {
+				Assert.fail();
+			}
+		}
+	}
+
+	@Test
+	public void testConsistentSnapshotSerializationFlagsAndMasks() {
+
+		Assert.assertEquals(0xFFFF, RocksDBKeyedStateBackend.RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK);
+		Assert.assertEquals(0x80, RocksDBKeyedStateBackend.RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
+
+		byte[] expectedKey = new byte[] {42, 42};
+		byte[] modKey = expectedKey.clone();
+
+		Assert.assertFalse(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+
+		RocksDBKeyedStateBackend.RocksDBSnapshotOperation.setMetaDataFollowsFlagInKey(modKey);
+		Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+
+		RocksDBKeyedStateBackend.RocksDBSnapshotOperation.clearMetaDataFollowsFlag(modKey);
+		Assert.assertFalse(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+
+		Assert.assertTrue(Arrays.equals(expectedKey, modKey));
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates us a CheckpointStateOutputStream that blocks write ops on a latch to delay writing of snapshots.
+	 */
+	static class BlockingStreamMemoryStateBackend extends MemoryStateBackend {
+
+		public static OneShotLatch waitFirstWriteLatch = new OneShotLatch();
+
+		public static OneShotLatch unblockCancelLatch = new OneShotLatch();
+
+		volatile boolean closed = false;
+
+		@Override
+		public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
+			return new MemCheckpointStreamFactory(4 * 1024 * 1024) {
+				@Override
+				public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+
+					return new MemoryCheckpointOutputStream(4 * 1024 * 1024) {
+						@Override
+						public void write(int b) throws IOException {
+							waitFirstWriteLatch.trigger();
+							try {
+								unblockCancelLatch.await();
+							} catch (InterruptedException e) {
+								Thread.currentThread().interrupt();
+							}
+							if(closed) {
+								throw new IOException("Stream closed.");
+							}
+							super.write(b);
+						}
+
+						@Override
+						public void write(byte[] b, int off, int len) throws IOException {
+							waitFirstWriteLatch.trigger();
+							try {
+								unblockCancelLatch.await();
+							} catch (InterruptedException e) {
+								Thread.currentThread().interrupt();
+							}
+							if(closed) {
+								throw new IOException("Stream closed.");
+							}
+							super.write(b, off, len);
+						}
+
+						@Override
+						public void close() {
+							closed = true;
+							super.close();
+						}
+					};
+				}
+			};
+		}
+	}
+
+	public static class AsyncCheckpointOperator
+		extends AbstractStreamOperator<String>
+		implements OneInputStreamOperator<String, String> {
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+
+			// also get the state in open, this way we are sure that it was created before
+			// we trigger the test checkpoint
+			ValueState<String> state = getPartitionedState(
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE,
+					new ValueStateDescriptor<>("count",
+							StringSerializer.INSTANCE, "hello"));
+
+		}
+
+		@Override
+		public void processElement(StreamRecord<String> element) throws Exception {
+			// we also don't care
+
+			ValueState<String> state = getPartitionedState(
+					VoidNamespace.INSTANCE,
+					VoidNamespaceSerializer.INSTANCE,
+					new ValueStateDescriptor<>("count",
+							StringSerializer.INSTANCE, "hello"));
+
+			state.update(element.getValue());
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			// not interested
+		}
+	}
+
+	public static class DummyMapFunction<T> implements MapFunction<T, T> {
+		@Override
+		public T map(T value) { return value; }
+	}
+}