You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/04/20 16:07:25 UTC

[2/2] flink git commit: [FLINK-3718] Add Option For Completely Async Backup in RocksDB State Backend

[FLINK-3718] Add Option For Completely Async Backup in RocksDB State Backend

This also refactors the RocksDB backend to keep one RocksDB data base in
the backend where all key/value state is stored. Individual named
key/value states get a reference to the db and store their state in a
column family. This way, we only have to backup one RocksDB data base
and can centrally decide how to do backups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c33325e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c33325e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c33325e3

Branch: refs/heads/master
Commit: c33325e3ffb9acecfc9c562b21a3ac703bae983d
Parents: 77f1a2e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Apr 8 14:58:50 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Apr 20 16:06:41 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   | 381 +---------
 .../contrib/streaming/state/OptionsFactory.java |  21 +-
 .../streaming/state/PredefinedOptions.java      |  70 +-
 .../streaming/state/RocksDBFoldingState.java    | 106 +--
 .../streaming/state/RocksDBListState.java       | 101 +--
 .../streaming/state/RocksDBReducingState.java   | 102 +--
 .../streaming/state/RocksDBStateBackend.java    | 740 +++++++++++++++++--
 .../streaming/state/RocksDBValueState.java      | 101 +--
 .../FullyAsyncRocksDBStateBackendTest.java      |  65 ++
 .../state/RocksDBAsyncKVSnapshotTest.java       | 106 ++-
 .../state/RocksDBStateBackendConfigTest.java    |  40 +-
 .../common/state/FoldingStateDescriptor.java    |  31 +
 .../api/common/state/ListStateDescriptor.java   |  29 +
 .../common/state/ReducingStateDescriptor.java   |  30 +
 .../flink/api/common/state/StateDescriptor.java |  23 +-
 .../api/common/state/ValueStateDescriptor.java  |  31 +
 .../flink/hdfstests/FileStateBackendTest.java   |  15 +-
 .../runtime/state/AbstractStateBackend.java     |   8 +-
 .../runtime/state/FileStateBackendTest.java     |  13 +
 .../runtime/state/MemoryStateBackendTest.java   |  13 +
 .../runtime/state/StateBackendTestBase.java     | 627 ++++++++--------
 .../EventTimeWindowCheckpointingITCase.java     |  16 +-
 22 files changed, 1536 insertions(+), 1133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 74e0509..ec58a63 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -17,195 +17,64 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.commons.io.FileUtils;
-
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
+
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
-
-import org.apache.flink.streaming.util.HDFSCopyFromLocal;
-import org.apache.flink.streaming.util.HDFSCopyToLocal;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.rocksdb.BackupEngine;
-import org.rocksdb.BackupableDBOptions;
-import org.rocksdb.Env;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RestoreOptions;
-import org.rocksdb.RocksDB;
+import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
-import java.net.URI;
-import java.util.UUID;
-
-import static java.util.Objects.requireNonNull;
 
 /**
  * Base class for {@link State} implementations that store state in a RocksDB database.
  *
- * <p>This base class is responsible for setting up the RocksDB database, for
- * checkpointing/restoring the database and for disposal in the {@link #dispose()} method. The
- * concrete subclasses just use the RocksDB handle to store/retrieve state.
- *
- * <p>State is checkpointed asynchronously. The synchronous part is drawing the actual backup
- * from RocksDB, this is done in {@link #snapshot(long, long)}. This will return a
- * {@link AsyncRocksDBSnapshot} that will perform the copying of the backup to the remote
- * file system.
+ * <p>State is not stored in this class but in the {@link org.rocksdb.RocksDB} instance that
+ * the {@link RocksDBStateBackend} manages and checkpoints.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <S> The type of {@link State}.
  * @param <SD> The type of {@link StateDescriptor}.
  */
-public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>>
-	implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
+abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, ?>>
+		implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
 
-	/** Serializer for the keys */
-	protected final TypeSerializer<K> keySerializer;
-
 	/** Serializer for the namespace */
-	protected final TypeSerializer<N> namespaceSerializer;
-
-	/** The current key, which the next value methods will refer to */
-	protected K currentKey;
+	private final TypeSerializer<N> namespaceSerializer;
 
 	/** The current namespace, which the next value methods will refer to */
-	protected N currentNamespace;
-
-	/** Store it so that we can clean up in dispose() */
-	protected final File basePath;
+	private N currentNamespace;
 
-	/** FileSystem path where checkpoints are stored */
-	protected final String checkpointPath;
+	/** Backend that holds the actual RocksDB instance where we store state */
+	protected RocksDBStateBackend backend;
 
-	/** Directory in "basePath" where the actual RocksDB data base instance stores its files */
-	protected final File rocksDbPath;
-
-	/** Our RocksDB instance */
-	protected final RocksDB db;
+	/** The column family of this particular instance of state */
+	ColumnFamilyHandle columnFamily;
 
 	/**
 	 * Creates a new RocksDB backed state.
 	 *
-	 * @param keySerializer The serializer for the keys.
 	 * @param namespaceSerializer The serializer for the namespace.
-	 * @param basePath The path on the local system where RocksDB data should be stored.
 	 */
-	protected AbstractRocksDBState(
-			TypeSerializer<K> keySerializer,
+	AbstractRocksDBState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
-			File basePath,
-			String checkpointPath,
-			Options options) {
-
-		rocksDbPath = new File(basePath, "db" + UUID.randomUUID().toString());
+			RocksDBStateBackend backend) {
 
-		this.keySerializer = requireNonNull(keySerializer);
 		this.namespaceSerializer = namespaceSerializer;
-		this.basePath = basePath;
-		this.checkpointPath = checkpointPath;
-
-		RocksDB.loadLibrary();
-
-		if (!basePath.exists()) {
-			if (!basePath.mkdirs()) {
-				throw new RuntimeException("Could not create RocksDB data directory.");
-			}
-		}
-
-		// clean it, this will remove the last part of the path but RocksDB will recreate it
-		try {
-			if (rocksDbPath.exists()) {
-				LOG.warn("Deleting already existing db directory {}.", rocksDbPath);
-				FileUtils.deleteDirectory(rocksDbPath);
-			}
-		} catch (IOException e) {
-			throw new RuntimeException("Error cleaning RocksDB data directory.", e);
-		}
-
-		try {
-			db = RocksDB.open(options, rocksDbPath.getAbsolutePath());
-		} catch (RocksDBException e) {
-			throw new RuntimeException("Error while opening RocksDB instance.", e);
-		}
+		this.backend = backend;
 
-	}
-
-	/**
-	 * Creates a new RocksDB backed state and restores from the given backup directory. After
-	 * restoring the backup directory is deleted.
-	 *
-	 * @param keySerializer The serializer for the keys.
-	 * @param namespaceSerializer The serializer for the namespace.
-	 * @param basePath The path on the local system where RocksDB data should be stored.
-	 * @param restorePath The path to a backup directory from which to restore RocksDb database.
-	 */
-	protected AbstractRocksDBState(
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer,
-			File basePath,
-			String checkpointPath,
-			String restorePath,
-			Options options) {
-
-		rocksDbPath = new File(basePath, "db" + UUID.randomUUID().toString());
-
-		RocksDB.loadLibrary();
-
-		// clean it, this will remove the last part of the path but RocksDB will recreate it
-		try {
-			if (rocksDbPath.exists()) {
-				LOG.warn("Deleting already existing db directory {}.", rocksDbPath);
-				FileUtils.deleteDirectory(rocksDbPath);
-			}
-		} catch (IOException e) {
-			throw new RuntimeException("Error cleaning RocksDB data directory.", e);
-		}
-
-		try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(restorePath + "/"))) {
-			backupEngine.restoreDbFromLatestBackup(rocksDbPath.getAbsolutePath(), rocksDbPath.getAbsolutePath(), new RestoreOptions(true));
-		} catch (RocksDBException|IllegalArgumentException e) {
-			throw new RuntimeException("Error while restoring RocksDB state from " + restorePath, e);
-		} finally {
-			try {
-				FileUtils.deleteDirectory(new File(restorePath));
-			} catch (IOException e) {
-				LOG.error("Error cleaning up local restore directory " + restorePath, e);
-			}
-		}
-
-		this.keySerializer = requireNonNull(keySerializer);
-		this.namespaceSerializer = namespaceSerializer;
-		this.basePath = basePath;
-		this.checkpointPath = checkpointPath;
-
-		if (!basePath.exists()) {
-			if (!basePath.mkdirs()) {
-				throw new RuntimeException("Could not create RocksDB data directory.");
-			}
-		}
-
-		try {
-			db = RocksDB.open(options, rocksDbPath.getAbsolutePath());
-		} catch (RocksDBException e) {
-			throw new RuntimeException("Error while opening RocksDB instance.", e);
-		}
+		this.columnFamily = columnFamily;
 	}
 
 	// ------------------------------------------------------------------------
@@ -217,234 +86,38 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 		try {
 			writeKeyAndNamespace(out);
 			byte[] key = baos.toByteArray();
-			db.remove(key);
+			backend.db.remove(columnFamily, key);
 		} catch (IOException|RocksDBException e) {
 			throw new RuntimeException("Error while removing entry from RocksDB", e);
 		}
 	}
 
-	protected void writeKeyAndNamespace(DataOutputView out) throws IOException {
-		keySerializer.serialize(currentKey, out);
+	void writeKeyAndNamespace(DataOutputView out) throws IOException {
+		backend.keySerializer().serialize(backend.currentKey(), out);
 		out.writeByte(42);
 		namespaceSerializer.serialize(currentNamespace, out);
 	}
 
 	@Override
-	final public void setCurrentKey(K currentKey) {
-		this.currentKey = currentKey;
-	}
-
-	@Override
 	final public void setCurrentNamespace(N namespace) {
 		this.currentNamespace = namespace;
 	}
 
-	protected abstract AbstractRocksDBSnapshot<K, N, S, SD> createRocksDBSnapshot(URI backupUri, long checkpointId);
-
-	@Override
-	public final KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(final long checkpointId, long timestamp) throws Exception {
-
-		final File localBackupPath = new File(basePath, "local-chk-" + checkpointId);
-		final URI backupUri = new URI(checkpointPath + "/chk-" + checkpointId);
-
-
-		if (!localBackupPath.exists()) {
-			if (!localBackupPath.mkdirs()) {
-				throw new RuntimeException("Could not create local backup path " + localBackupPath);
-			}
-		}
-
-		long startTime = System.currentTimeMillis();
-		BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());
-		// we disabled the WAL
-		backupOptions.setBackupLogFiles(false);
-		// no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot
-		backupOptions.setSync(false);
-		try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(),
-				backupOptions)) {
-			// make sure to flush because we don't write to the write-ahead-log
-			db.flush(new FlushOptions().setWaitForFlush(true));
-			backupEngine.createNewBackup(db);
-		}
-		long endTime = System.currentTimeMillis();
-		LOG.info("RocksDB (" + rocksDbPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
-
-		return new AsyncRocksDBSnapshot<>(
-			localBackupPath,
-			backupUri,
-			checkpointId,
-			this);
-	}
-
 	@Override
 	final public void dispose() {
-		db.dispose();
-		try {
-			FileUtils.deleteDirectory(basePath);
-		} catch (IOException e) {
-			throw new RuntimeException("Error disposing RocksDB data directory.", e);
-		}
+		// ignore because we don't hold any state ourselves
 	}
 
-	protected static abstract class AbstractRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>>
-			implements KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> {
-		private static final long serialVersionUID = 1L;
-
-		private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);
-
-		// ------------------------------------------------------------------------
-		//  Ctor parameters for RocksDB state
-		// ------------------------------------------------------------------------
-
-		/** Store it so that we can clean up in dispose() */
-		protected final File basePath;
-
-		/** Where we should put RocksDB backups */
-		protected final String checkpointPath;
-
-		// ------------------------------------------------------------------------
-		//  Info about this checkpoint
-		// ------------------------------------------------------------------------
-
-		protected final URI backupUri;
-
-		protected long checkpointId;
-
-		// ------------------------------------------------------------------------
-		//  For sanity checks
-		// ------------------------------------------------------------------------
-
-		/** Key serializer */
-		protected final TypeSerializer<K> keySerializer;
-
-		/** Namespace serializer */
-		protected final TypeSerializer<N> namespaceSerializer;
-
-		/** Hash of the StateDescriptor, for sanity checks */
-		protected final SD stateDesc;
-
-		/**
-		 * Creates a new snapshot from the given state parameters.
-		 */
-		public AbstractRocksDBSnapshot(File basePath,
-				String checkpointPath,
-				URI backupUri,
-				long checkpointId,
-				TypeSerializer<K> keySerializer,
-				TypeSerializer<N> namespaceSerializer,
-				SD stateDesc) {
-			
-			this.basePath = basePath;
-			this.checkpointPath = checkpointPath;
-			this.backupUri = backupUri;
-			this.checkpointId = checkpointId;
-
-			this.stateDesc = stateDesc;
-			this.keySerializer = keySerializer;
-			this.namespaceSerializer = namespaceSerializer;
-		}
-
-		/**
-		 * Subclasses must implement this for creating a concrete RocksDB state.
-		 */
-		protected abstract KvState<K, N, S, SD, RocksDBStateBackend> createRocksDBState(
-				TypeSerializer<K> keySerializer,
-				TypeSerializer<N> namespaceSerializer,
-				SD stateDesc,
-				File basePath,
-				String backupPath,
-				String restorePath,
-				Options options) throws Exception;
-
-		@Override
-		public final KvState<K, N, S, SD, RocksDBStateBackend> restoreState(
-				RocksDBStateBackend stateBackend,
-				TypeSerializer<K> keySerializer,
-				ClassLoader classLoader,
-				long recoveryTimestamp) throws Exception {
-
-			// validity checks
-			if (!this.keySerializer.equals(keySerializer)) {
-				throw new IllegalArgumentException(
-					"Cannot restore the state from the snapshot with the given serializers. " +
-						"State (K/V) was serialized with " +
-						"(" + keySerializer + ") " +
-						"now is (" + keySerializer + ")");
-			}
-
-			if (!basePath.exists()) {
-				if (!basePath.mkdirs()) {
-					throw new RuntimeException("Could not create RocksDB base path " + basePath);
-				}
-			}
-
-			final File localBackupPath = new File(basePath, "chk-" + checkpointId);
-
-			if (localBackupPath.exists()) {
-				try {
-					LOG.warn("Deleting already existing local backup directory {}.", localBackupPath);
-					FileUtils.deleteDirectory(localBackupPath);
-				} catch (IOException e) {
-					throw new RuntimeException("Error cleaning RocksDB local backup directory.", e);
-				}
-			}
-
-			HDFSCopyToLocal.copyToLocal(backupUri, basePath);
-			return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, basePath,
-					checkpointPath, localBackupPath.getAbsolutePath(), stateBackend.getRocksDBOptions());
-		}
-
-		@Override
-		public final void discardState() throws Exception {
-			FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
-			fs.delete(new Path(backupUri), true);
-		}
+	@Override
+	public void setCurrentKey(K key) {
+		// ignore because we don't hold any state ourselves
 
-		@Override
-		public final long getStateSize() throws Exception {
-			FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
-			return fs.getContentSummary(new Path(backupUri)).getLength();
-		}
 	}
 
-	/**
-	 * Upon snapshotting the RocksDB backup is created synchronously. The asynchronous part is
-	 * copying the backup to a (possibly) remote filesystem. This is done in {@link #materialize()}
-	 * of this class.
-	 */
-	private static class AsyncRocksDBSnapshot<K, N, S extends State, SD extends StateDescriptor<S, ?>> extends AsynchronousKvStateSnapshot<K, N, S, SD, RocksDBStateBackend> {
-		private static final long serialVersionUID = 1L;
-		private final File localBackupPath;
-		private final URI backupUri;
-		private final long checkpointId;
-		private transient AbstractRocksDBState<K, N, S, SD> state;
-
-		public AsyncRocksDBSnapshot(File localBackupPath,
-				URI backupUri,
-				long checkpointId,
-				AbstractRocksDBState<K, N, S, SD> state) {
-				this.localBackupPath = localBackupPath;
-				this.backupUri = backupUri;
-				this.checkpointId = checkpointId;
-			this.state = state;
-		}
-
-		@Override
-		public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> materialize() throws Exception {
-			try {
-				long startTime = System.currentTimeMillis();
-				HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
-				long endTime = System.currentTimeMillis();
-				LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
-				return state.createRocksDBSnapshot(backupUri, checkpointId);
-			} catch (Exception e) {
-				FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
-				fs.delete(new Path(backupUri), true);
-				throw e;
-			} finally {
-				FileUtils.deleteQuietly(localBackupPath);
-			}
-		}
+	@Override
+	public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(long checkpointId,
+			long timestamp) throws Exception {
+		throw new RuntimeException("Should not be called. Backups happen in RocksDBStateBackend.");
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
index 3e52f1f..863c5da 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.rocksdb.Options;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
 
 /**
- * A factory for {@link Options} to be passed to the {@link RocksDBStateBackend}.
+ * A factory for {@link DBOptions} to be passed to the {@link RocksDBStateBackend}.
  * Options have to be created lazily by this factory, because the {@code Options}
  * class is not serializable and holds pointers to native code.
  * 
@@ -55,5 +56,19 @@ public interface OptionsFactory extends java.io.Serializable {
 	 * @param currentOptions The options object with the pre-defined options. 
 	 * @return The options object on which the additional options are set.
 	 */
-	Options createOptions(Options currentOptions);
+	DBOptions createDBOptions(DBOptions currentOptions);
+
+	/**
+	 * This method should set the additional options on top of the current options object.
+	 * The current options object may contain pre-defined options based on flags that have
+	 * been configured on the state backend.
+	 *
+	 * <p>It is important to set the options on the current object and return the result from
+	 * the setter methods, otherwise the pre-defined options may get lost.
+	 *
+	 * @param currentOptions The options object with the pre-defined options.
+	 * @return The options object on which the additional options are set.
+	 */
+	ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions);
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index c19b54f..93aac85 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -19,8 +19,10 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactionStyle;
-import org.rocksdb.Options;
+import org.rocksdb.DBOptions;
+import org.rocksdb.StringAppendOperator;
 
 /**
  * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}. 
@@ -42,11 +44,18 @@ public enum PredefinedOptions {
 	DEFAULT {
 		
 		@Override
-		public Options createOptions() {
-			return new Options()
+		public DBOptions createDBOptions() {
+			return new DBOptions()
 					.setUseFsync(false)
 					.setDisableDataSync(true);
 		}
+
+		@Override
+		public ColumnFamilyOptions createColumnOptions() {
+			return new ColumnFamilyOptions()
+					.setMergeOperator(new StringAppendOperator());
+		}
+
 	},
 
 	/**
@@ -72,16 +81,22 @@ public enum PredefinedOptions {
 	SPINNING_DISK_OPTIMIZED {
 
 		@Override
-		public Options createOptions() {
+		public DBOptions createDBOptions() {
 
-			return new Options()
-					.setCompactionStyle(CompactionStyle.LEVEL)
-					.setLevelCompactionDynamicLevelBytes(true)
+			return new DBOptions()
 					.setIncreaseParallelism(4)
 					.setUseFsync(false)
 					.setDisableDataSync(true)
 					.setMaxOpenFiles(-1);
 		}
+
+		@Override
+		public ColumnFamilyOptions createColumnOptions() {
+			return new ColumnFamilyOptions()
+					.setMergeOperator(new StringAppendOperator())
+					.setCompactionStyle(CompactionStyle.LEVEL)
+					.setLevelCompactionDynamicLevelBytes(true);
+		}
 	},
 
 	/**
@@ -113,25 +128,32 @@ public enum PredefinedOptions {
 	SPINNING_DISK_OPTIMIZED_HIGH_MEM {
 
 		@Override
-		public Options createOptions() {
+		public DBOptions createDBOptions() {
+
+			return new DBOptions()
+					.setIncreaseParallelism(4)
+					.setUseFsync(false)
+					.setDisableDataSync(true)
+					.setMaxOpenFiles(-1);
+		}
+
+		@Override
+		public ColumnFamilyOptions createColumnOptions() {
 
 			final long blockCacheSize = 256 * 1024 * 1024;
 			final long blockSize = 128 * 1024;
 			final long targetFileSize = 256 * 1024 * 1024;
 			final long writeBufferSize = 64 * 1024 * 1024;
 
-			return new Options()
+			return new ColumnFamilyOptions()
+					.setMergeOperator(new StringAppendOperator())
 					.setCompactionStyle(CompactionStyle.LEVEL)
 					.setLevelCompactionDynamicLevelBytes(true)
 					.setTargetFileSizeBase(targetFileSize)
 					.setMaxBytesForLevelBase(4 * targetFileSize)
 					.setWriteBufferSize(writeBufferSize)
-					.setIncreaseParallelism(4)
 					.setMinWriteBufferNumberToMerge(3)
 					.setMaxWriteBufferNumber(4)
-					.setUseFsync(false)
-					.setDisableDataSync(true)
-					.setMaxOpenFiles(-1)
 					.setTableFormatConfig(
 							new BlockBasedTableConfig()
 									.setBlockCacheSize(blockCacheSize)
@@ -160,21 +182,35 @@ public enum PredefinedOptions {
 	FLASH_SSD_OPTIMIZED {
 
 		@Override
-		public Options createOptions() {
-			return new Options()
+		public DBOptions createDBOptions() {
+			return new DBOptions()
 					.setIncreaseParallelism(4)
 					.setUseFsync(false)
 					.setDisableDataSync(true)
 					.setMaxOpenFiles(-1);
 		}
+
+		@Override
+		public ColumnFamilyOptions createColumnOptions() {
+			return new ColumnFamilyOptions()
+					.setMergeOperator(new StringAppendOperator());
+		}
 	};
 	
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates the options for this pre-defined setting.
+	 * Creates the {@link DBOptions}for this pre-defined setting.
 	 * 
 	 * @return The pre-defined options object. 
 	 */
-	public abstract Options createOptions();
+	public abstract DBOptions createDBOptions();
+
+	/**
+	 * Creates the {@link org.rocksdb.ColumnFamilyOptions}for this pre-defined setting.
+	 *
+	 * @return The pre-defined options object.
+	 */
+	public abstract ColumnFamilyOptions createColumnOptions();
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 20b5181..91fa807 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -24,17 +24,14 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KvState;
 
-import org.rocksdb.Options;
+import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 
 import static java.util.Objects.requireNonNull;
 
@@ -46,7 +43,7 @@ import static java.util.Objects.requireNonNull;
  * @param <T> The type of the values that can be folded into the state.
  * @param <ACC> The type of the value in the folding state.
  */
-public class RocksDBFoldingState<K, N, T, ACC>
+class RocksDBFoldingState<K, N, T, ACC>
 	extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>>
 	implements FoldingState<T, ACC> {
 
@@ -54,7 +51,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
 	private final TypeSerializer<ACC> valueSerializer;
 
 	/** This holds the name of the state and can create an initial default value for the state. */
-	protected final FoldingStateDescriptor<T, ACC> stateDesc;
+	private final FoldingStateDescriptor<T, ACC> stateDesc;
 
 	/** User-specified fold function */
 	private final FoldFunction<T, ACC> foldFunction;
@@ -63,59 +60,23 @@ public class RocksDBFoldingState<K, N, T, ACC>
 	 * We disable writes to the write-ahead-log here. We can't have these in the base class
 	 * because JNI segfaults for some reason if they are.
 	 */
-	protected final WriteOptions writeOptions;
+	private final WriteOptions writeOptions;
 
 	/**
 	 * Creates a new {@code RocksDBFoldingState}.
 	 *
-	 * @param keySerializer The serializer for the keys.
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                     and can create a default state value.
-	 * @param dbPath The path on the local system where RocksDB data should be stored.
-	 * @param backupPath The path where to store backups.
 	 */
-	protected RocksDBFoldingState(
-			TypeSerializer<K> keySerializer,
+	RocksDBFoldingState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc,
-			File dbPath,
-			String backupPath,
-			Options options) {
-		
-		super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
-		
-		this.stateDesc = requireNonNull(stateDesc);
-		this.valueSerializer = stateDesc.getSerializer();
-		this.foldFunction = stateDesc.getFoldFunction();
-
-		writeOptions = new WriteOptions();
-		writeOptions.setDisableWAL(true);
-	}
+			RocksDBStateBackend backend) {
 
-	/**
-	 * Creates a {@code RocksDBFoldingState} by restoring from a directory.
-	 *
-	 * @param keySerializer The serializer for the keys.
-	 * @param namespaceSerializer The serializer for the namespace.
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                     and can create a default state value.
-	 * @param dbPath The path on the local system where RocksDB data should be stored.
-	 * @param backupPath The path where to store backups.
-	 * @param restorePath The path on the local file system that we are restoring from.
-	 */
-	protected RocksDBFoldingState(
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer,
-			FoldingStateDescriptor<T, ACC> stateDesc,
-			File dbPath,
-			String backupPath,
-			String restorePath,
-			Options options) {
-		
-		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
+		super(columnFamily, namespaceSerializer, backend);
 		
-		this.stateDesc = stateDesc;
+		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
 		this.foldFunction = stateDesc.getFoldFunction();
 
@@ -130,7 +91,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
 		try {
 			writeKeyAndNamespace(out);
 			byte[] key = baos.toByteArray();
-			byte[] valueBytes = db.get(key);
+			byte[] valueBytes = backend.db.get(columnFamily, key);
 			if (valueBytes == null) {
 				return stateDesc.getDefaultValue();
 			}
@@ -147,65 +108,22 @@ public class RocksDBFoldingState<K, N, T, ACC>
 		try {
 			writeKeyAndNamespace(out);
 			byte[] key = baos.toByteArray();
-			byte[] valueBytes = db.get(key);
+			byte[] valueBytes = backend.db.get(columnFamily, key);
 
 			if (valueBytes == null) {
 				baos.reset();
 				valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
-				db.put(writeOptions, key, baos.toByteArray());
+				backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
 			} else {
 				ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
 				ACC newValue = foldFunction.fold(oldValue, value);
 				baos.reset();
 				valueSerializer.serialize(newValue, out);
-				db.put(writeOptions, key, baos.toByteArray());
+				backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
 			}
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);
 		}
 	}
-
-	@Override
-	protected AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> createRocksDBSnapshot(
-			URI backupUri, long checkpointId) {
-		
-		return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
-	}
-
-	private static class Snapshot<K, N, T, ACC> extends AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> {
-		private static final long serialVersionUID = 1L;
-
-		public Snapshot(
-				File dbPath,
-				String checkpointPath,
-				URI backupUri,
-				long checkpointId,
-				TypeSerializer<K> keySerializer,
-				TypeSerializer<N> namespaceSerializer,
-				FoldingStateDescriptor<T, ACC> stateDesc) {
-			
-			super(dbPath,
-				checkpointPath,
-				backupUri,
-				checkpointId,
-				keySerializer,
-				namespaceSerializer,
-				stateDesc);
-		}
-
-		@Override
-		protected KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, RocksDBStateBackend> 
-			createRocksDBState(
-				TypeSerializer<K> keySerializer,
-				TypeSerializer<N> namespaceSerializer,
-				FoldingStateDescriptor<T, ACC> stateDesc,
-				File dbPath,
-				String backupPath,
-				String restorePath,
-				Options options) throws Exception {
-			
-			return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath, options);
-		}
-	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index f5c589c..dc55d11 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -23,17 +23,14 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KvState;
 
-import org.rocksdb.Options;
+import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -43,11 +40,15 @@ import static java.util.Objects.requireNonNull;
 /**
  * {@link ListState} implementation that stores state in RocksDB.
  *
+ * <p>{@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since
+ * we use the {@code merge()} call.
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of the values in the list state.
  */
-public class RocksDBListState<K, N, V>
+class RocksDBListState<K, N, V>
 	extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>>
 	implements ListState<V> {
 
@@ -55,59 +56,27 @@ public class RocksDBListState<K, N, V>
 	private final TypeSerializer<V> valueSerializer;
 
 	/** This holds the name of the state and can create an initial default value for the state. */
-	protected final ListStateDescriptor<V> stateDesc;
+	private final ListStateDescriptor<V> stateDesc;
 
 	/**
 	 * We disable writes to the write-ahead-log here. We can't have these in the base class
 	 * because JNI segfaults for some reason if they are.
 	 */
-	protected final WriteOptions writeOptions;
+	private final WriteOptions writeOptions;
 
 	/**
 	 * Creates a new {@code RocksDBListState}.
 	 *
-	 * @param keySerializer The serializer for the keys.
-	 * @param namespaceSerializer The serializer for the namespace.
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                     and can create a default state value.
-	 * @param dbPath The path on the local system where RocksDB data should be stored.
-	 * @param backupPath The path where to store backups.
-	 */
-	protected RocksDBListState(TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer,
-			ListStateDescriptor<V> stateDesc,
-			File dbPath,
-			String backupPath,
-			Options options) {
-		
-		super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
-		this.stateDesc = requireNonNull(stateDesc);
-		this.valueSerializer = stateDesc.getSerializer();
-
-		writeOptions = new WriteOptions();
-		writeOptions.setDisableWAL(true);
-	}
-
-	/**
-	 * Creates a {@code RocksDBListState} by restoring from a directory.
-	 *
-	 * @param keySerializer The serializer for the keys.
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                     and can create a default state value.
-	 * @param dbPath The path on the local system where RocksDB data should be stored.
-	 * @param backupPath The path where to store backups.
-	 * @param restorePath The path on the local file system that we are restoring from.
 	 */
-	protected RocksDBListState(TypeSerializer<K> keySerializer,
+	RocksDBListState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<V> stateDesc,
-			File dbPath,
-			String backupPath,
-			String restorePath,
-			Options options) {
+			RocksDBStateBackend backend) {
 		
-		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
+		super(columnFamily, namespaceSerializer, backend);
 		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
 
@@ -122,7 +91,7 @@ public class RocksDBListState<K, N, V>
 		try {
 			writeKeyAndNamespace(out);
 			byte[] key = baos.toByteArray();
-			byte[] valueBytes = db.get(key);
+			byte[] valueBytes = backend.db.get(columnFamily, key);
 
 			if (valueBytes == null) {
 				return Collections.emptyList();
@@ -155,55 +124,11 @@ public class RocksDBListState<K, N, V>
 			baos.reset();
 
 			valueSerializer.serialize(value, out);
-			db.merge(writeOptions, key, baos.toByteArray());
+			backend.db.merge(columnFamily, writeOptions, key, baos.toByteArray());
 
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);
 		}
 	}
-
-	@Override
-	protected AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>> createRocksDBSnapshot(
-			URI backupUri,
-			long checkpointId) {
-		
-		return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
-	}
-
-	private static class Snapshot<K, N, V> extends 
-			AbstractRocksDBSnapshot<K, N, ListState<V>, ListStateDescriptor<V>>
-	{
-		private static final long serialVersionUID = 1L;
-
-		public Snapshot(File dbPath,
-			String checkpointPath,
-			URI backupUri,
-			long checkpointId,
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer,
-			ListStateDescriptor<V> stateDesc) {
-			super(dbPath,
-				checkpointPath,
-				backupUri,
-				checkpointId,
-				keySerializer,
-				namespaceSerializer,
-				stateDesc);
-		}
-
-		@Override
-		protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, RocksDBStateBackend> createRocksDBState(
-				TypeSerializer<K> keySerializer,
-				TypeSerializer<N> namespaceSerializer,
-				ListStateDescriptor<V> stateDesc,
-				File basePath,
-				String backupPath,
-				String restorePath,
-				Options options) throws Exception {
-			
-			return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, basePath,
-					checkpointPath, restorePath, options);
-		}
-	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 6a965cc..953660d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -24,17 +24,14 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.KvState;
 
-import org.rocksdb.Options;
+import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.WriteOptions;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 
 import static java.util.Objects.requireNonNull;
 
@@ -45,7 +42,7 @@ import static java.util.Objects.requireNonNull;
  * @param <N> The type of the namespace.
  * @param <V> The type of value that the state state stores.
  */
-public class RocksDBReducingState<K, N, V>
+class RocksDBReducingState<K, N, V>
 	extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>>
 	implements ReducingState<V> {
 
@@ -53,7 +50,7 @@ public class RocksDBReducingState<K, N, V>
 	private final TypeSerializer<V> valueSerializer;
 
 	/** This holds the name of the state and can create an initial default value for the state. */
-	protected final ReducingStateDescriptor<V> stateDesc;
+	private final ReducingStateDescriptor<V> stateDesc;
 
 	/** User-specified reduce function */
 	private final ReduceFunction<V> reduceFunction;
@@ -62,26 +59,21 @@ public class RocksDBReducingState<K, N, V>
 	 * We disable writes to the write-ahead-log here. We can't have these in the base class
 	 * because JNI segfaults for some reason if they are.
 	 */
-	protected final WriteOptions writeOptions;
+	private final WriteOptions writeOptions;
 
 	/**
 	 * Creates a new {@code RocksDBReducingState}.
 	 *
-	 * @param keySerializer The serializer for the keys.
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param stateDesc The state identifier for the state. This contains name
 	 *                     and can create a default state value.
-	 * @param dbPath The path on the local system where RocksDB data should be stored.
-	 * @param backupPath The path where to store backups.
 	 */
-	protected RocksDBReducingState(TypeSerializer<K> keySerializer,
+	RocksDBReducingState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			ReducingStateDescriptor<V> stateDesc,
-			File dbPath,
-			String backupPath,
-			Options options) {
+			RocksDBStateBackend backend) {
 		
-		super(keySerializer, namespaceSerializer, dbPath, backupPath, options);
+		super(columnFamily, namespaceSerializer, backend);
 		this.stateDesc = requireNonNull(stateDesc);
 		this.valueSerializer = stateDesc.getSerializer();
 		this.reduceFunction = stateDesc.getReduceFunction();
@@ -90,34 +82,6 @@ public class RocksDBReducingState<K, N, V>
 		writeOptions.setDisableWAL(true);
 	}
 
-	/**
-	 * Creates a {@code RocksDBReducingState} by restoring from a directory.
-	 *
-	 * @param keySerializer The serializer for the keys.
-	 * @param namespaceSerializer The serializer for the namespace.
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                     and can create a default state value.
-	 * @param dbPath The path on the local system where RocksDB data should be stored.
-	 * @param backupPath The path where to store backups.
-	 * @param restorePath The path on the local file system that we are restoring from.
-	 */
-	protected RocksDBReducingState(TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer,
-			ReducingStateDescriptor<V> stateDesc,
-			File dbPath,
-			String backupPath,
-			String restorePath,
-			Options options) {
-		
-		super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options);
-		this.stateDesc = stateDesc;
-		this.valueSerializer = stateDesc.getSerializer();
-		this.reduceFunction = stateDesc.getReduceFunction();
-
-		writeOptions = new WriteOptions();
-		writeOptions.setDisableWAL(true);
-	}
-
 	@Override
 	public V get() {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -125,7 +89,7 @@ public class RocksDBReducingState<K, N, V>
 		try {
 			writeKeyAndNamespace(out);
 			byte[] key = baos.toByteArray();
-			byte[] valueBytes = db.get(key);
+			byte[] valueBytes = backend.db.get(columnFamily, key);
 			if (valueBytes == null) {
 				return null;
 			}
@@ -142,66 +106,22 @@ public class RocksDBReducingState<K, N, V>
 		try {
 			writeKeyAndNamespace(out);
 			byte[] key = baos.toByteArray();
-			byte[] valueBytes = db.get(key);
+			byte[] valueBytes = backend.db.get(columnFamily, key);
 
 			if (valueBytes == null) {
 				baos.reset();
 				valueSerializer.serialize(value, out);
-				db.put(writeOptions, key, baos.toByteArray());
+				backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
 			} else {
 				V oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
 				V newValue = reduceFunction.reduce(oldValue, value);
 				baos.reset();
 				valueSerializer.serialize(newValue, out);
-				db.put(writeOptions, key, baos.toByteArray());
+				backend.db.put(columnFamily, writeOptions, key, baos.toByteArray());
 			}
 		} catch (Exception e) {
 			throw new RuntimeException("Error while adding data to RocksDB", e);
 		}
 	}
-
-	@Override
-	protected AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>> createRocksDBSnapshot(
-			URI backupUri,
-			long checkpointId) {
-		
-		return new Snapshot<>(basePath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc);
-	}
-
-	private static class Snapshot<K, N, V> extends 
-			AbstractRocksDBSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>>
-	{
-		private static final long serialVersionUID = 1L;
-
-		public Snapshot(File dbPath,
-			String checkpointPath,
-			URI backupUri,
-			long checkpointId,
-			TypeSerializer<K> keySerializer,
-			TypeSerializer<N> namespaceSerializer,
-			ReducingStateDescriptor<V> stateDesc) {
-			super(dbPath,
-				checkpointPath,
-				backupUri,
-				checkpointId,
-				keySerializer,
-				namespaceSerializer,
-				stateDesc);
-		}
-
-		@Override
-		protected KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, RocksDBStateBackend> createRocksDBState(
-				TypeSerializer<K> keySerializer,
-				TypeSerializer<N> namespaceSerializer,
-				ReducingStateDescriptor<V> stateDesc,
-				File basePath,
-				String backupPath,
-				String restorePath,
-				Options options) throws Exception {
-			
-			return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc,
-					basePath, checkpointPath, restorePath, options);
-		}
-	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c33325e3/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 8f846da..3d63bd2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -17,15 +17,21 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -33,18 +39,40 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.api.common.state.StateBackend;
 
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.rocksdb.Options;
-import org.rocksdb.StringAppendOperator;
+import org.apache.flink.streaming.util.HDFSCopyFromLocal;
+import org.apache.flink.streaming.util.HDFSCopyToLocal;
+import org.apache.hadoop.fs.FileSystem;
+import org.rocksdb.BackupEngine;
+import org.rocksdb.BackupableDBOptions;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Env;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RestoreOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +95,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
-	
+
+	// ------------------------------------------------------------------------
+	//  Static configuration values
+	// ------------------------------------------------------------------------
 	
 	/** The checkpoint directory that we copy the RocksDB backups to. */
 	private final Path checkpointDirectory;
@@ -75,6 +106,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	/** The state backend that stores the non-partitioned state */
 	private final AbstractStateBackend nonPartitionedStateBackend;
 
+	/** Whether we do snapshots fully asynchronous */
+	private boolean fullyAsyncBackup = false;
+
 	/** Operator identifier that is used to uniqueify the RocksDB storage path. */
 	private String operatorIdentifier;
 
@@ -100,8 +134,35 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	private OptionsFactory optionsFactory;
 	
 	/** The options from the options factory, cached */
-	private transient Options rocksDbOptions;
-	
+	private transient DBOptions dbOptions;
+	private transient ColumnFamilyOptions columnOptions;
+
+	// ------------------------------------------------------------------------
+	//  Per operator values that are set in initializerForJob
+	// ------------------------------------------------------------------------
+
+	/** Path where this configured instance stores its data directory */
+	private transient File instanceBasePath;
+
+	/** Path where this configured instance stores its RocksDB data base */
+	private transient File instanceRocksDBPath;
+
+	/** Base path where this configured instance stores checkpoints */
+	private transient String instanceCheckpointPath;
+
+	/**
+	 * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
+	 * to store state. The different k/v states that we have don't each have their own RocksDB
+	 * instance. They all write to this instance but to their own column family.
+	 */
+	transient RocksDB db;
+
+	/**
+	 * Information about the k/v states as we create them. This is used to retrieve the
+	 * column family that is used for a state and also for sanity checks when restoring.
+	 */
+	private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation;
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -141,15 +202,11 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 
-	public RocksDBStateBackend(
-			String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
-		
+	public RocksDBStateBackend(String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
 		this(new Path(checkpointDataUri).toUri(), nonPartitionedStateBackend);
 	}
 	
-	public RocksDBStateBackend(
-			URI checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
-
+	public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
 		this.nonPartitionedStateBackend = requireNonNull(nonPartitionedStateBackend);
 		this.checkpointDirectory = FsStateBackend.validateAndNormalizeUri(checkpointDataUri);
 	}
@@ -201,6 +258,40 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		}
 		
 		nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
+
+		instanceBasePath = new File(getDbPath("dummy_state"), UUID.randomUUID().toString());
+		instanceCheckpointPath = getCheckpointPath("dummy_state");
+		instanceRocksDBPath = new File(instanceBasePath, "db");
+
+		RocksDB.loadLibrary();
+
+		if (!instanceBasePath.exists()) {
+			if (!instanceBasePath.mkdirs()) {
+				throw new RuntimeException("Could not create RocksDB data directory.");
+			}
+		}
+
+		// clean it, this will remove the last part of the path but RocksDB will recreate it
+		try {
+			if (instanceRocksDBPath.exists()) {
+				LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath);
+				FileUtils.deleteDirectory(instanceRocksDBPath);
+			}
+		} catch (IOException e) {
+			throw new RuntimeException("Error cleaning RocksDB data directory.", e);
+		}
+
+		List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
+		// RocksDB seems to need this...
+		columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
+		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
+		try {
+			db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
+		} catch (RocksDBException e) {
+			throw new RuntimeException("Error while opening RocksDB instance.", e);
+		}
+
+		kvStateInformation = new HashMap<>();
 	}
 
 	@Override
@@ -209,29 +300,43 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
+	public void dispose() {
+		super.dispose();
+		nonPartitionedStateBackend.dispose();
+
+		if (this.dbOptions != null) {
+			this.dbOptions.dispose();
+			this.dbOptions = null;
+		}
+		for (Tuple2<ColumnFamilyHandle, StateDescriptor> column: kvStateInformation.values()) {
+			column.f0.dispose();
+		}
+		db.dispose();
+	}
+
+	@Override
 	public void close() throws Exception {
 		nonPartitionedStateBackend.close();
 		
-		Options opt = this.rocksDbOptions;
-		if (opt != null) {
-			opt.dispose();
-			this.rocksDbOptions = null;
+		if (this.dbOptions != null) {
+			this.dbOptions.dispose();
+			this.dbOptions = null;
 		}
+		for (Tuple2<ColumnFamilyHandle, StateDescriptor> column: kvStateInformation.values()) {
+			column.f0.dispose();
+		}
+		db.dispose();
 	}
 
-	File getDbPath(String stateName) {
+	private File getDbPath(String stateName) {
 		return new File(new File(new File(getNextStoragePath(), jobId.toString()), operatorIdentifier), stateName);
 	}
 
-	String getCheckpointPath(String stateName) {
+	private String getCheckpointPath(String stateName) {
 		return checkpointDirectory + "/" + jobId.toString() + "/" + operatorIdentifier + "/" + stateName;
 	}
-	
-	File[] getStoragePaths() {
-		return initializedDbBasePaths;
-	}
-	
-	File getNextStoragePath() {
+
+	private File getNextStoragePath() {
 		int ni = nextDirectory + 1;
 		ni = ni >= initializedDbBasePaths.length ? 0 : ni;
 		nextDirectory = ni;
@@ -239,53 +344,545 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		return initializedDbBasePaths[ni];
 	}
 
+	/**
+	 * Visible for tests.
+	 */
+	public File[] getStoragePaths() {
+		return initializedDbBasePaths;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Snapshot and restore
+	// ------------------------------------------------------------------------
+
+	@Override
+	public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
+		if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) {
+			return new HashMap<>();
+		}
+
+		if (fullyAsyncBackup) {
+			return performFullyAsyncSnapshot(checkpointId, timestamp);
+		} else {
+			return performSemiAsyncSnapshot(checkpointId, timestamp);
+		}
+	}
+
+	/**
+	 * Performs a checkpoint by using the RocksDB backup feature to backup to a directory.
+	 * This backup is the asynchronously copied to the final checkpoint location.
+	 */
+	private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
+		// We don't snapshot individual k/v states since everything is stored in a central
+		// RocksDB data base. Create a dummy KvStateSnapshot that holds the information about
+		// that checkpoint. We use the in injectKeyValueStateSnapshots to restore.
+
+		final File localBackupPath = new File(instanceBasePath, "local-chk-" + checkpointId);
+		final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);
+
+		if (!localBackupPath.exists()) {
+			if (!localBackupPath.mkdirs()) {
+				throw new RuntimeException("Could not create local backup path " + localBackupPath);
+			}
+		}
+
+		long startTime = System.currentTimeMillis();
+
+		BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());
+		// we disabled the WAL
+		backupOptions.setBackupLogFiles(false);
+		// no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot
+		backupOptions.setSync(false);
+
+		try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), backupOptions)) {
+			// wait before flush with "true"
+			backupEngine.createNewBackup(db, true);
+		}
+
+		long endTime = System.currentTimeMillis();
+		LOG.info("RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
+
+		// draw a copy in case it get's changed while performing the async snapshot
+		List<StateDescriptor> kvStateInformationCopy = new ArrayList<>();
+		for (Tuple2<ColumnFamilyHandle, StateDescriptor> state: kvStateInformation.values()) {
+			kvStateInformationCopy.add(state.f1);
+		}
+		SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath,
+				backupUri,
+				kvStateInformationCopy,
+				checkpointId);
+
+
+		HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
+		result.put("dummy_state", dummySnapshot);
+		return result;
+	}
+
+	/**
+	 * Performs a checkpoint by drawing a {@link org.rocksdb.Snapshot} from RocksDB and then
+	 * iterating over all key/value pairs in RocksDB to store them in the final checkpoint
+	 * location. The only synchronous part is the drawing of the {@code Snapshot} which
+	 * is essentially free.
+	 */
+	private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
+		// we draw a snapshot from RocksDB then iterate over all keys at that point
+		// and store them in the backup location
+
+		final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);
+
+		long startTime = System.currentTimeMillis();
+
+		org.rocksdb.Snapshot snapshot = db.getSnapshot();
+
+		long endTime = System.currentTimeMillis();
+		LOG.info("Fully asynchronous RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
+
+		// draw a copy in case it get's changed while performing the async snapshot
+		Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamiliesCopy = new HashMap<>();
+		columnFamiliesCopy.putAll(kvStateInformation);
+		FullyAsynSnapshot dummySnapshot = new FullyAsynSnapshot(db,
+				snapshot,
+				this,
+				backupUri,
+				columnFamiliesCopy,
+				checkpointId);
+
+
+		HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
+		result.put("dummy_state", dummySnapshot);
+		return result;
+	}
+
+	@Override
+	public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots, long recoveryTimestamp) throws Exception {
+		if (keyValueStateSnapshots.size() == 0) {
+			return;
+		}
+
+		KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state");
+		if (dummyState instanceof FinalSemiAsyncSnapshot) {
+			restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) dummyState);
+		} else if (dummyState instanceof FinalFullyAsyncSnapshot) {
+			restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) dummyState);
+		} else {
+			throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState);
+		}
+	}
+
+	private void restoreFromSemiAsyncSnapshot(FinalSemiAsyncSnapshot snapshot) throws Exception {
+		// This does mostly the same work as initializeForJob, we remove the existing RocksDB
+		// directory and create a new one from the backup.
+		// This must be refactored. The StateBackend should either be initialized from
+		// scratch or from a snapshot.
+
+		if (!instanceBasePath.exists()) {
+			if (!instanceBasePath.mkdirs()) {
+				throw new RuntimeException("Could not create RocksDB data directory.");
+			}
+		}
+
+		db.dispose();
+
+		// clean it, this will remove the last part of the path but RocksDB will recreate it
+		try {
+			if (instanceRocksDBPath.exists()) {
+				LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath);
+				FileUtils.deleteDirectory(instanceRocksDBPath);
+			}
+		} catch (IOException e) {
+			throw new RuntimeException("Error cleaning RocksDB data directory.", e);
+		}
+
+		final File localBackupPath = new File(instanceBasePath, "chk-" + snapshot.checkpointId);
+
+		if (localBackupPath.exists()) {
+			try {
+				LOG.warn("Deleting already existing local backup directory {}.", localBackupPath);
+				FileUtils.deleteDirectory(localBackupPath);
+			} catch (IOException e) {
+				throw new RuntimeException("Error cleaning RocksDB local backup directory.", e);
+			}
+		}
+
+		HDFSCopyToLocal.copyToLocal(snapshot.backupUri, instanceBasePath);
+
+		try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(localBackupPath.getAbsolutePath()))) {
+			backupEngine.restoreDbFromLatestBackup(instanceRocksDBPath.getAbsolutePath(), instanceRocksDBPath.getAbsolutePath(), new RestoreOptions(true));
+		} catch (RocksDBException|IllegalArgumentException e) {
+			throw new RuntimeException("Error while restoring RocksDB state from " + localBackupPath, e);
+		} finally {
+			try {
+				FileUtils.deleteDirectory(localBackupPath);
+			} catch (IOException e) {
+				LOG.error("Error cleaning up local restore directory " + localBackupPath, e);
+			}
+		}
+
+
+		List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(snapshot.stateDescriptors.size());
+		for (StateDescriptor stateDescriptor: snapshot.stateDescriptors) {
+			columnFamilyDescriptors.add(new ColumnFamilyDescriptor(stateDescriptor.getName().getBytes(), getColumnOptions()));
+		}
+
+		// RocksDB seems to need this...
+		columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
+		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(snapshot.stateDescriptors.size());
+		try {
+
+			db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
+			this.kvStateInformation = new HashMap<>();
+			for (int i = 0; i < snapshot.stateDescriptors.size(); i++) {
+				this.kvStateInformation.put(snapshot.stateDescriptors.get(i).getName(), new Tuple2<>(columnFamilyHandles.get(i), snapshot.stateDescriptors.get(i)));
+			}
+
+		} catch (RocksDBException e) {
+			throw new RuntimeException("Error while opening RocksDB instance.", e);
+		}
+	}
+
+	private void restoreFromFullyAsyncSnapshot(FinalFullyAsyncSnapshot snapshot) throws Exception {
+
+		DataInputView inputView = snapshot.stateHandle.getState(userCodeClassLoader);
+
+		// clear k/v state information before filling it
+		kvStateInformation.clear();
+
+		// first get the column family mapping
+		int numColumns = inputView.readInt();
+		Map<Byte, StateDescriptor> columnFamilyMapping = new HashMap<>(numColumns);
+		for (int i = 0; i < numColumns; i++) {
+			byte mappingByte = inputView.readByte();
+
+			ObjectInputStream ooIn = new ObjectInputStream(new DataInputViewStream(inputView));
+			StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
+
+			columnFamilyMapping.put(mappingByte, stateDescriptor);
+
+			// this will fill in the k/v state information
+			getColumnFamily(stateDescriptor);
+		}
+
+		// try and read until EOF
+		try {
+			// the EOFException will get us out of this...
+			while (true) {
+				byte mappingByte = inputView.readByte();
+				ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte));
+				byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
+				byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
+				db.put(handle, key, value);
+			}
+		} catch (EOFException e) {
+			// expected
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Semi-asynchronous Backup Classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Upon snapshotting the RocksDB backup is created synchronously. The asynchronous part is
+	 * copying the backup to a (possibly) remote filesystem. This is done in {@link #materialize()}.
+	 */
+	private static class SemiAsyncSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
+		private static final long serialVersionUID = 1L;
+		private final File localBackupPath;
+		private final URI backupUri;
+		private final List<StateDescriptor> stateDescriptors;
+		private final long checkpointId;
+
+		private SemiAsyncSnapshot(File localBackupPath,
+				URI backupUri,
+				List<StateDescriptor> columnFamilies,
+				long checkpointId) {
+			this.localBackupPath = localBackupPath;
+			this.backupUri = backupUri;
+			this.stateDescriptors = columnFamilies;
+			this.checkpointId = checkpointId;
+		}
+
+		@Override
+		public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
+			try {
+				long startTime = System.currentTimeMillis();
+				HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
+				long endTime = System.currentTimeMillis();
+				LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
+				return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);
+			} catch (Exception e) {
+				FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
+				fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
+				throw e;
+			} finally {
+				FileUtils.deleteQuietly(localBackupPath);
+			}
+		}
+	}
+
+	/**
+	 * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base. This
+	 * also stores the column families that we had at the time of the snapshot so that we can
+	 * restore these. This results from {@link SemiAsyncSnapshot}.
+	 */
+	private static class FinalSemiAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
+		private static final long serialVersionUID = 1L;
+
+		final URI backupUri;
+		final long checkpointId;
+		private final List<StateDescriptor> stateDescriptors;
+
+		/**
+		 * Creates a new snapshot from the given state parameters.
+		 */
+		private FinalSemiAsyncSnapshot(URI backupUri, long checkpointId, List<StateDescriptor> stateDescriptors) {
+			this.backupUri = backupUri;
+			this.checkpointId = checkpointId;
+			this.stateDescriptors = stateDescriptors;
+		}
+
+		@Override
+		public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(
+				RocksDBStateBackend stateBackend,
+				TypeSerializer<Object> keySerializer,
+				ClassLoader classLoader,
+				long recoveryTimestamp) throws Exception {
+			throw new RuntimeException("Should never happen.");
+		}
+
+		@Override
+		public final void discardState() throws Exception {
+			FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
+			fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
+		}
+
+		@Override
+		public final long getStateSize() throws Exception {
+			FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
+			return fs.getContentSummary(new org.apache.hadoop.fs.Path(backupUri)).getLength();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Fully asynchronous Backup Classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This does the snapshot using a RocksDB snapshot and an iterator over all keys
+	 * at the point of that snapshot.
+	 */
+	private static class FullyAsynSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
+		private static final long serialVersionUID = 1L;
+
+		private transient final RocksDB db;
+		private transient org.rocksdb.Snapshot snapshot;
+		private transient AbstractStateBackend backend;
+
+		private final URI backupUri;
+		private final Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies;
+		private final long checkpointId;
+
+		private FullyAsynSnapshot(RocksDB db,
+				org.rocksdb.Snapshot snapshot,
+				AbstractStateBackend backend,
+				URI backupUri,
+				Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies,
+				long checkpointId) {
+			this.db = db;
+			this.snapshot = snapshot;
+			this.backend = backend;
+			this.backupUri = backupUri;
+			this.columnFamilies = columnFamilies;
+			this.checkpointId = checkpointId;
+		}
+
+		@Override
+		public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
+			try {
+				long startTime = System.currentTimeMillis();
+
+				CheckpointStateOutputView outputView = backend.createCheckpointStateOutputView(checkpointId, startTime);
+
+				outputView.writeInt(columnFamilies.size());
+
+				// we don't know how many key/value pairs there are in each column family.
+				// We prefix every written element with a byte that signifies to which
+				// column family it belongs, this way we can restore the column families
+				byte count = 0;
+				Map<String, Byte> columnFamilyMapping = new HashMap<>();
+				for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
+					columnFamilyMapping.put(column.getKey(), count);
+
+					outputView.writeByte(count);
+
+					ObjectOutputStream ooOut = new ObjectOutputStream(outputView);
+					ooOut.writeObject(column.getValue().f1);
+					ooOut.flush();
+
+					count++;
+				}
+
+				for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
+					byte columnByte = columnFamilyMapping.get(column.getKey());
+					ReadOptions readOptions = new ReadOptions();
+					readOptions.setSnapshot(snapshot);
+					RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
+					iterator.seekToFirst();
+					while (iterator.isValid()) {
+						outputView.writeByte(columnByte);
+						BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(), outputView);
+						BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(), outputView);
+						iterator.next();
+					}
+				}
+
+				StateHandle<DataInputView> stateHandle = outputView.closeAndGetHandle();
+
+				long endTime = System.currentTimeMillis();
+				LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
+				return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
+			} finally {
+				db.releaseSnapshot(snapshot);
+				snapshot = null;
+			}
+		}
+
+		@Override
+		protected void finalize() throws Throwable {
+			if (snapshot != null) {
+				db.releaseSnapshot(snapshot);
+			}
+			super.finalize();
+		}
+	}
+
+	/**
+	 * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base. This
+	 * results from {@link FullyAsynSnapshot}.
+	 */
+	private static class FinalFullyAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
+		private static final long serialVersionUID = 1L;
+
+		final StateHandle<DataInputView> stateHandle;
+		final long checkpointId;
+
+		/**
+		 * Creates a new snapshot from the given state parameters.
+		 */
+		private FinalFullyAsyncSnapshot(StateHandle<DataInputView> stateHandle, long checkpointId) {
+			this.stateHandle = stateHandle;
+			this.checkpointId = checkpointId;
+		}
+
+		@Override
+		public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(
+				RocksDBStateBackend stateBackend,
+				TypeSerializer<Object> keySerializer,
+				ClassLoader classLoader,
+				long recoveryTimestamp) throws Exception {
+			throw new RuntimeException("Should never happen.");
+		}
+
+		@Override
+		public final void discardState() throws Exception {
+			stateHandle.discardState();
+		}
+
+		@Override
+		public final long getStateSize() throws Exception {
+			return stateHandle.getStateSize();
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  State factories
 	// ------------------------------------------------------------------------
-	
+
+	/**
+	 * Creates a column family handle for use with a k/v state. When restoring from a snapshot
+	 * we don't restore the individual k/v states, just the global RocksDB data base and the
+	 * list of column families. When a k/v state is first requested we check here whether we
+	 * already have a column family for that and return it or create a new one if it doesn't exist.
+	 *
+	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
+	 * that we checkpointed, i.e. is already in the map of column families.
+	 */
+	private ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor)  {
+
+		Tuple2<ColumnFamilyHandle, StateDescriptor> stateInfo = kvStateInformation.get(descriptor.getName());
+
+		if (stateInfo != null) {
+			if (!stateInfo.f1.equals(descriptor)) {
+				throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 + " trying access with " + descriptor);
+			}
+			return stateInfo.f0;
+		}
+
+		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(descriptor.getName().getBytes(), getColumnOptions());
+
+		try {
+			ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
+			kvStateInformation.put(descriptor.getName(), new Tuple2<>(columnFamily, descriptor));
+			return columnFamily;
+		} catch (RocksDBException e) {
+			throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
+		}
+	}
+
+	/**
+	 * Used by k/v states to access the current key.
+	 */
+	Object currentKey() {
+		return currentKey;
+	}
+
+	/**
+	 * Used by k/v states to access the key serializer.
+	 */
+	TypeSerializer keySerializer() {
+		return keySerializer;
+	}
+
 	@Override
 	protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
 			ValueStateDescriptor<T> stateDesc) throws Exception {
 
-		File dbPath = getDbPath(stateDesc.getName());
-		String checkpointPath = getCheckpointPath(stateDesc.getName());
-		
-		return new RocksDBValueState<>(keySerializer, namespaceSerializer, 
-				stateDesc, dbPath, checkpointPath, getRocksDBOptions());
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+		return new RocksDBValueState<>(columnFamily, namespaceSerializer,  stateDesc, this);
 	}
 
 	@Override
 	protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<T> stateDesc) throws Exception {
 
-		File dbPath = getDbPath(stateDesc.getName());
-		String checkpointPath = getCheckpointPath(stateDesc.getName());
-		
-		return new RocksDBListState<>(keySerializer, namespaceSerializer, 
-				stateDesc, dbPath, checkpointPath, getRocksDBOptions());
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+		return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this);
 	}
 
 	@Override
 	protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
 			ReducingStateDescriptor<T> stateDesc) throws Exception {
 
-		File dbPath = getDbPath(stateDesc.getName());
-		String checkpointPath = getCheckpointPath(stateDesc.getName());
-		
-		return new RocksDBReducingState<>(keySerializer, namespaceSerializer, 
-				stateDesc, dbPath, checkpointPath, getRocksDBOptions());
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+		return new RocksDBReducingState<>(columnFamily, namespaceSerializer,  stateDesc, this);
 	}
 
 	@Override
 	protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 
-		File dbPath = getDbPath(stateDesc.getName());
-		String checkpointPath = getCheckpointPath(stateDesc.getName());
-		return new RocksDBFoldingState<>(keySerializer, namespaceSerializer,
-				stateDesc, dbPath, checkpointPath, getRocksDBOptions());
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+		return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Non-partitioned state
+	// ------------------------------------------------------------------------
+
 	@Override
 	public CheckpointStateOutputStream createCheckpointStateOutputStream(
 			long checkpointID, long timestamp) throws Exception {
@@ -305,6 +902,28 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Enables fully asynchronous snapshotting of the partitioned state held in RocksDB.
+	 *
+	 * <p>By default, this is disabled. This means that RocksDB state is copied in a synchronous
+	 * step, during which normal processing of elements pauses, followed by an asynchronous step
+	 * of copying the RocksDB backup to the final checkpoint location. Fully asynchronous
+	 * snapshots take longer (linear time requirement with respect to number of unique keys)
+	 * but normal processing of elements is not paused.
+	 */
+	public void enableFullyAsyncSnapshots() {
+		this.fullyAsyncBackup = true;
+	}
+
+	/**
+	 * Disables fully asynchronous snapshotting of the partitioned state held in RocksDB.
+	 *
+	 * <p>By default, this is disabled.
+	 */
+	public void disableFullyAsyncSnapshots() {
+		this.fullyAsyncBackup = false;
+	}
+
+	/**
 	 * Sets the path where the RocksDB local database files should be stored on the local
 	 * file system. Setting this path overrides the default behavior, where the
 	 * files are stored across the configured temp directories.
@@ -431,24 +1050,41 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	/**
-	 * Gets the RocksDB Options to be used for all RocksDB instances.
+	 * Gets the RocksDB {@link DBOptions} to be used for all RocksDB instances.
 	 */
-	Options getRocksDBOptions() {
-		if (rocksDbOptions == null) {
+	public DBOptions getDbOptions() {
+		if (dbOptions == null) {
 			// initial options from pre-defined profile
-			Options opt = predefinedOptions.createOptions();
+			DBOptions opt = predefinedOptions.createDBOptions();
 
 			// add user-defined options, if specified
 			if (optionsFactory != null) {
-				opt = optionsFactory.createOptions(opt);
+				opt = optionsFactory.createDBOptions(opt);
 			}
 			
 			// add necessary default options
 			opt = opt.setCreateIfMissing(true);
-			opt = opt.setMergeOperator(new StringAppendOperator());
-			
-			rocksDbOptions = opt;
+
+			dbOptions = opt;
+		}
+		return dbOptions;
+	}
+
+	/**
+	 * Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances.
+	 */
+	public ColumnFamilyOptions getColumnOptions() {
+		if (columnOptions == null) {
+			// initial options from pre-defined profile
+			ColumnFamilyOptions opt = predefinedOptions.createColumnOptions();
+
+			// add user-defined options, if specified
+			if (optionsFactory != null) {
+				opt = optionsFactory.createColumnOptions(opt);
+			}
+
+			columnOptions = opt;
 		}
-		return rocksDbOptions;
+		return columnOptions;
 	}
 }