You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:31 UTC

[13/27] flink git commit: [FLINK-3761] Refactor State Backends/Make Keyed State Key-Group Aware

[FLINK-3761] Refactor State Backends/Make Keyed State Key-Group Aware

The biggest change in this is that functionality that used to be in
AbstractStateBackend is now moved to CheckpointStreamFactory and
KeyedStateBackend. The former is responsible for providing streams that
can be used to checkpoint data while the latter is responsible for
keeping keyed state. A keyed backend can checkpoint the state that it
keeps by using a CheckpointStreamFactory.

This also refactors how asynchronous keyed state snapshots work. They
are not implemented using a Future/RunnableFuture.

Also, this changes the keyed state backends to be key-group aware and to
snapshot the state in key-groups with an index for restoring.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4809f536
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4809f536
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4809f536

Branch: refs/heads/master
Commit: 4809f5367b08a9734fc1bd4875be51a9f3bb65aa
Parents: 516ad01
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Aug 10 18:44:50 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Aug 31 19:10:01 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |  30 +-
 .../streaming/state/RocksDBFoldingState.java    |   2 +-
 .../state/RocksDBKeyedStateBackend.java         | 251 ++++++
 .../streaming/state/RocksDBListState.java       |   4 +-
 .../streaming/state/RocksDBReducingState.java   |   4 +-
 .../streaming/state/RocksDBStateBackend.java    | 884 +++----------------
 .../streaming/state/RocksDBValueState.java      |   2 +-
 .../FullyAsyncRocksDBStateBackendTest.java      |  65 --
 .../state/RocksDBAsyncKVSnapshotTest.java       |   4 +-
 .../state/RocksDBStateBackendConfigTest.java    | 644 +++++++-------
 .../state/RocksDBStateBackendTest.java          |  29 +-
 .../storm/tests/StormFieldsGroupingITCase.java  |  12 +-
 .../flink/storm/wrappers/BoltWrapper.java       |   2 +-
 .../flink/storm/wrappers/BoltWrapperTest.java   |   4 +-
 .../org/apache/flink/api/common/TaskInfo.java   |  13 +-
 .../common/operators/CollectionExecutor.java    |   8 +-
 .../flink/core/fs/FSDataOutputStream.java       |   2 +
 .../core/fs/local/LocalDataOutputStream.java    |   5 +
 .../memory/ByteArrayOutputStreamWithPos.java    | 281 ++++++
 .../functions/util/RuntimeUDFContextTest.java   |   2 +-
 .../api/common/io/RichInputFormatTest.java      |   2 +-
 .../api/common/io/RichOutputFormatTest.java     |   2 +-
 .../operators/GenericDataSinkBaseTest.java      |   2 +-
 .../operators/GenericDataSourceBaseTest.java    |   2 +-
 .../base/FlatMapOperatorCollectionTest.java     |   2 +-
 .../base/InnerJoinOperatorBaseTest.java         |   2 +-
 .../common/operators/base/MapOperatorTest.java  |   2 +-
 .../base/PartitionMapOperatorTest.java          |   2 +-
 .../flink/hdfstests/FileStateBackendTest.java   | 117 +--
 .../base/CoGroupOperatorCollectionTest.java     |   2 +-
 .../operators/base/GroupReduceOperatorTest.java |   2 +-
 .../base/InnerJoinOperatorBaseTest.java         |   2 +-
 .../operators/base/ReduceOperatorTest.java      |   2 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java |   2 +-
 .../AbstractKeyedCEPPatternOperator.java        |   2 +
 .../flink/cep/operator/CEPOperatorTest.java     |  62 +-
 .../checkpoint/CheckpointCoordinator.java       |  32 +-
 .../deployment/TaskDeploymentDescriptor.java    |  16 +-
 .../runtime/executiongraph/ExecutionVertex.java |   1 +
 .../runtime/fs/hdfs/HadoopDataOutputStream.java |   5 +
 .../flink/runtime/query/KvStateRegistry.java    |   6 +-
 .../runtime/query/TaskKvStateRegistry.java      |   2 +-
 .../query/netty/KvStateServerHandler.java       |   8 +-
 .../flink/runtime/state/AbstractHeapState.java  | 220 -----
 .../runtime/state/AbstractStateBackend.java     | 422 +--------
 .../state/AsynchronousKvStateSnapshot.java      |  68 --
 .../runtime/state/CheckpointStreamFactory.java  |  67 ++
 .../apache/flink/runtime/state/DoneFuture.java  |  70 ++
 .../runtime/state/GenericFoldingState.java      |  74 +-
 .../flink/runtime/state/GenericListState.java   |  69 +-
 .../runtime/state/GenericReducingState.java     |  74 +-
 .../flink/runtime/state/KeyGroupRange.java      |   3 +
 .../flink/runtime/state/KeyedStateBackend.java  | 340 +++++++
 .../org/apache/flink/runtime/state/KvState.java |  39 +-
 .../flink/runtime/state/KvStateSnapshot.java    |  61 --
 .../state/RetrievableStreamStateHandle.java     |   2 +-
 .../apache/flink/runtime/state/StateObject.java |   5 +-
 .../flink/runtime/state/StreamStateHandle.java  |   4 +-
 .../state/filesystem/AbstractFsState.java       |  95 --
 .../filesystem/AbstractFsStateSnapshot.java     | 139 ---
 .../state/filesystem/FileStateHandle.java       |   2 +-
 .../filesystem/FsCheckpointStreamFactory.java   | 313 +++++++
 .../state/filesystem/FsFoldingState.java        | 161 ----
 .../runtime/state/filesystem/FsListState.java   | 149 ----
 .../state/filesystem/FsReducingState.java       | 165 ----
 .../state/filesystem/FsStateBackend.java        | 371 +-------
 .../runtime/state/filesystem/FsValueState.java  | 148 ----
 .../runtime/state/heap/AbstractHeapState.java   | 187 ++++
 .../runtime/state/heap/HeapFoldingState.java    | 124 +++
 .../state/heap/HeapKeyedStateBackend.java       | 328 +++++++
 .../flink/runtime/state/heap/HeapListState.java | 156 ++++
 .../runtime/state/heap/HeapReducingState.java   | 123 +++
 .../runtime/state/heap/HeapValueState.java      | 112 +++
 .../flink/runtime/state/heap/StateTable.java    |  77 ++
 .../runtime/state/memory/AbstractMemState.java  |  82 --
 .../state/memory/AbstractMemStateSnapshot.java  | 144 ---
 .../state/memory/ByteStreamStateHandle.java     |   2 +-
 .../memory/MemCheckpointStreamFactory.java      | 146 +++
 .../runtime/state/memory/MemFoldingState.java   | 135 ---
 .../runtime/state/memory/MemListState.java      | 120 ---
 .../runtime/state/memory/MemReducingState.java  | 139 ---
 .../runtime/state/memory/MemValueState.java     | 122 ---
 .../state/memory/MemoryStateBackend.java        | 178 +---
 .../savepoint/SavepointLoaderTest.java          |   6 +-
 .../TaskDeploymentDescriptorTest.java           |   4 +-
 .../messages/CheckpointMessagesTest.java        |   2 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  14 +-
 .../operators/testutils/DummyEnvironment.java   |  11 +-
 .../operators/testutils/MockEnvironment.java    |  22 +-
 .../runtime/query/QueryableStateClientTest.java |  35 +-
 .../runtime/query/netty/KvStateClientTest.java  |  41 +-
 .../query/netty/KvStateServerHandlerTest.java   | 222 +++--
 .../runtime/query/netty/KvStateServerTest.java  |  51 +-
 .../runtime/state/FileStateBackendTest.java     | 110 +--
 .../runtime/state/MemoryStateBackendTest.java   |  19 +-
 .../runtime/state/StateBackendTestBase.java     | 650 ++++++++------
 .../FsCheckpointStateOutputStreamTest.java      |  27 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   2 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  27 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   2 +-
 .../source/ContinuousFileReaderOperator.java    |   2 +-
 .../api/graph/StreamGraphGenerator.java         |   1 -
 .../api/operators/AbstractStreamOperator.java   | 144 ++-
 .../operators/AbstractUdfStreamOperator.java    |  23 +-
 .../streaming/api/operators/StreamOperator.java |   7 +-
 .../operators/GenericWriteAheadSink.java        |  11 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   4 +-
 .../operators/windowing/WindowOperator.java     |   2 +-
 .../streaming/runtime/tasks/StreamTask.java     | 371 ++++----
 .../api/operators/StreamGroupedFoldTest.java    |  12 +-
 .../api/operators/StreamGroupedReduceTest.java  |   9 +-
 .../operators/StreamingRuntimeContextTest.java  |  30 +-
 .../operators/StreamOperatorChainingTest.java   |  15 -
 ...AlignedProcessingTimeWindowOperatorTest.java | 235 ++---
 ...AlignedProcessingTimeWindowOperatorTest.java | 279 +++---
 .../windowing/EvictingWindowOperatorTest.java   |  13 +-
 .../operators/windowing/WindowOperatorTest.java | 159 ++--
 .../tasks/InterruptSensitiveRestoreTest.java    |   4 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |  22 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   2 +-
 .../KeyedOneInputStreamOperatorTestHarness.java | 201 +++++
 .../flink/streaming/util/MockContext.java       |   4 +-
 .../util/OneInputStreamOperatorTestHarness.java |  63 +-
 .../flink/streaming/util/TestHarnessUtil.java   |   5 +-
 .../streaming/util/WindowingTestHarness.java    |   3 +-
 .../EventTimeWindowCheckpointingITCase.java     |  13 +-
 .../test/classloading/ClassLoaderITCase.java    |   2 +-
 .../classloading/jar/CustomKvStateProgram.java  |  37 +-
 .../state/StateHandleSerializationTest.java     |  14 -
 .../streaming/runtime/StateBackendITCase.java   |  70 +-
 131 files changed, 4955 insertions(+), 5810 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 3c4a209..710f506 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.util.Preconditions;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
@@ -46,7 +45,7 @@ import java.io.IOException;
  * @param <SD> The type of {@link StateDescriptor}.
  */
 public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, V>, V>
-		implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
+		implements KvState<N>, State {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
 
@@ -57,7 +56,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 	private N currentNamespace;
 
 	/** Backend that holds the actual RocksDB instance where we store state */
-	protected RocksDBStateBackend backend;
+	protected RocksDBKeyedStateBackend backend;
 
 	/** The column family of this particular instance of state */
 	protected ColumnFamilyHandle columnFamily;
@@ -77,7 +76,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 	protected AbstractRocksDBState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			SD stateDesc,
-			RocksDBStateBackend backend) {
+			RocksDBKeyedStateBackend backend) {
 
 		this.namespaceSerializer = namespaceSerializer;
 		this.backend = backend;
@@ -106,7 +105,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 	}
 
 	protected void writeKeyAndNamespace(DataOutputView out) throws IOException {
-		backend.keySerializer().serialize(backend.currentKey(), out);
+		backend.getKeySerializer().serialize(backend.getCurrentKey(), out);
 		out.writeByte(42);
 		namespaceSerializer.serialize(currentNamespace, out);
 	}
@@ -117,27 +116,6 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
 	}
 
 	@Override
-	public void dispose() {
-		// ignore because we don't hold any state ourselves
-	}
-
-	@Override
-	public SD getStateDescriptor() {
-		return stateDesc;
-	}
-
-	@Override
-	public void setCurrentKey(K key) {
-		// ignore because we don't hold any state ourselves
-	}
-
-	@Override
-	public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> snapshot(long checkpointId,
-			long timestamp) throws Exception {
-		throw new RuntimeException("Should not be called. Backups happen in RocksDBStateBackend.");
-	}
-
-	@Override
 	@SuppressWarnings("unchecked")
 	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
 		// Serialized key and namespace is expected to be of the same format

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index f1cf409..8c0799b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -66,7 +66,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
 	public RocksDBFoldingState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			FoldingStateDescriptor<T, ACC> stateDesc,
-			RocksDBStateBackend backend) {
+			RocksDBKeyedStateBackend backend) {
 
 		super(columnFamily, namespaceSerializer, stateDesc, backend);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
new file mode 100644
index 0000000..63f1fa2
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * A {@link KeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to
+ * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
+ * checkpointing. This state backend can store very large state that exceeds memory and spills
+ * to disk.
+ */
+public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
+
+	/** Operator identifier that is used to uniqueify the RocksDB storage path. */
+	private final String operatorIdentifier;
+
+	/** JobID for uniquifying backup paths. */
+	private final JobID jobId;
+
+	/** The options from the options factory, cached */
+	private final ColumnFamilyOptions columnOptions;
+
+	/** Path where this configured instance stores its data directory */
+	private final File instanceBasePath;
+
+	/** Path where this configured instance stores its RocksDB data base */
+	private final File instanceRocksDBPath;
+
+	/**
+	 * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
+	 * to store state. The different k/v states that we have don't each have their own RocksDB
+	 * instance. They all write to this instance but to their own column family.
+	 */
+	protected volatile RocksDB db;
+
+	/**
+	 * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous
+	 * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try
+	 * iterating over a disposed db.
+	 */
+	private final SerializableObject dbCleanupLock = new SerializableObject();
+
+	/**
+	 * Information about the k/v states as we create them. This is used to retrieve the
+	 * column family that is used for a state and also for sanity checks when restoring.
+	 */
+	private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation;
+
+	public RocksDBKeyedStateBackend(
+			JobID jobId,
+			String operatorIdentifier,
+			File instanceBasePath,
+			DBOptions dbOptions,
+			ColumnFamilyOptions columnFamilyOptions,
+			TaskKvStateRegistry kvStateRegistry,
+			TypeSerializer<K> keySerializer,
+			KeyGroupAssigner<K> keyGroupAssigner,
+	        KeyGroupRange keyGroupRange
+	) throws Exception {
+
+		super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);
+
+		this.operatorIdentifier = operatorIdentifier;
+		this.jobId = jobId;
+		this.columnOptions = columnFamilyOptions;
+
+		this.instanceBasePath = instanceBasePath;
+		this.instanceRocksDBPath = new File(instanceBasePath, "db");
+
+		RocksDB.loadLibrary();
+
+		if (!instanceBasePath.exists()) {
+			if (!instanceBasePath.mkdirs()) {
+				throw new RuntimeException("Could not create RocksDB data directory.");
+			}
+		}
+
+		// clean it, this will remove the last part of the path but RocksDB will recreate it
+		try {
+			if (instanceRocksDBPath.exists()) {
+				LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath);
+				FileUtils.deleteDirectory(instanceRocksDBPath);
+			}
+		} catch (IOException e) {
+			throw new RuntimeException("Error cleaning RocksDB data directory.", e);
+		}
+
+		List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
+		// RocksDB seems to need this...
+		columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
+		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
+		try {
+			db = RocksDB.open(dbOptions, instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
+		} catch (RocksDBException e) {
+			throw new RuntimeException("Error while opening RocksDB instance.", e);
+		}
+
+		kvStateInformation = new HashMap<>();
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+
+		// we have to lock because we might have an asynchronous checkpoint going on
+		synchronized (dbCleanupLock) {
+			if (db != null) {
+				for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
+					column.f0.dispose();
+				}
+
+				db.dispose();
+				db = null;
+			}
+		}
+
+		FileUtils.deleteDirectory(instanceBasePath);
+	}
+
+	@Override
+	public Future<KeyGroupsStateHandle> snapshot(
+			long checkpointId,
+			long timestamp,
+			CheckpointStreamFactory streamFactory) throws Exception {
+		throw new RuntimeException("Not implemented.");
+	}
+
+	// ------------------------------------------------------------------------
+	//  State factories
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a column family handle for use with a k/v state. When restoring from a snapshot
+	 * we don't restore the individual k/v states, just the global RocksDB data base and the
+	 * list of column families. When a k/v state is first requested we check here whether we
+	 * already have a column family for that and return it or create a new one if it doesn't exist.
+	 *
+	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
+	 * that we checkpointed, i.e. is already in the map of column families.
+	 */
+	protected ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor) {
+
+		Tuple2<ColumnFamilyHandle, StateDescriptor> stateInfo = kvStateInformation.get(descriptor.getName());
+
+		if (stateInfo != null) {
+			if (!stateInfo.f1.equals(descriptor)) {
+				throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 + " trying access with " + descriptor);
+			}
+			return stateInfo.f0;
+		}
+
+		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(descriptor.getName().getBytes(), columnOptions);
+
+		try {
+			ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
+			kvStateInformation.put(descriptor.getName(), new Tuple2<>(columnFamily, descriptor));
+			return columnFamily;
+		} catch (RocksDBException e) {
+			throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
+		}
+	}
+
+	@Override
+	protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
+			ValueStateDescriptor<T> stateDesc) throws Exception {
+
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+		return new RocksDBValueState<>(columnFamily, namespaceSerializer,  stateDesc, this);
+	}
+
+	@Override
+	protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
+			ListStateDescriptor<T> stateDesc) throws Exception {
+
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+		return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this);
+	}
+
+	@Override
+	protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
+			ReducingStateDescriptor<T> stateDesc) throws Exception {
+
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+		return new RocksDBReducingState<>(columnFamily, namespaceSerializer,  stateDesc, this);
+	}
+
+	@Override
+	protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
+			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+
+		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+
+		return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index ff1038e..d8f937b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -67,8 +67,8 @@ public class RocksDBListState<K, N, V>
 	public RocksDBListState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<V> stateDesc,
-			RocksDBStateBackend backend) {
-		
+			RocksDBKeyedStateBackend backend) {
+
 		super(columnFamily, namespaceSerializer, stateDesc, backend);
 		this.valueSerializer = stateDesc.getSerializer();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index efa2931..15ae493 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -65,8 +65,8 @@ public class RocksDBReducingState<K, N, V>
 	public RocksDBReducingState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			ReducingStateDescriptor<V> stateDesc,
-			RocksDBStateBackend backend) {
-		
+			RocksDBKeyedStateBackend backend) {
+
 		super(columnFamily, namespaceSerializer, stateDesc, backend);
 		this.valueSerializer = stateDesc.getSerializer();
 		this.reduceFunction = stateDesc.getReduceFunction();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 74276c0..62b71d9 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -17,64 +17,31 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.common.state.StateBackend;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.streaming.util.HDFSCopyFromLocal;
-import org.apache.flink.streaming.util.HDFSCopyToLocal;
-import org.apache.hadoop.fs.FileSystem;
-import org.rocksdb.BackupEngine;
-import org.rocksdb.BackupableDBOptions;
-import org.rocksdb.ColumnFamilyDescriptor;
-import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.Env;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.RestoreOptions;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
@@ -83,12 +50,12 @@ import static java.util.Objects.requireNonNull;
 /**
  * A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can
  * store very large state that exceeds memory and spills to disk.
- * 
+ *
  * <p>All key/value state (including windows) is stored in the key/value index of RocksDB.
  * For persistence against loss of machines, checkpoints take a snapshot of the
  * RocksDB database, and persist that snapshot in a file system (by default) or
  * another configurable state backend.
- * 
+ *
  * <p>The behavior of the RocksDB instances can be parametrized by setting RocksDB Options
  * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and
  * {@link #setOptions(OptionsFactory)}.
@@ -101,15 +68,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	// ------------------------------------------------------------------------
 	//  Static configuration values
 	// ------------------------------------------------------------------------
-	
-	/** The checkpoint directory that we copy the RocksDB backups to. */
-	private final Path checkpointDirectory;
-
-	/** The state backend that stores the non-partitioned state */
-	private final AbstractStateBackend nonPartitionedStateBackend;
 
-	/** Whether we do snapshots fully asynchronous */
-	private boolean fullyAsyncBackup = false;
+	/** The state backend that we use for creating checkpoint streams. */
+	private final AbstractStateBackend checkpointStreamBackend;
 
 	/** Operator identifier that is used to uniqueify the RocksDB storage path. */
 	private String operatorIdentifier;
@@ -118,66 +79,35 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	private JobID jobId;
 
 	// DB storage directories
-	
+
 	/** Base paths for RocksDB directory, as configured. May be null. */
 	private Path[] configuredDbBasePaths;
 
 	/** Base paths for RocksDB directory, as initialized */
 	private File[] initializedDbBasePaths;
-	
+
 	private int nextDirectory;
-	
+
 	// RocksDB options
-	
+
 	/** The pre-configured option settings */
 	private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
-	
+
 	/** The options factory to create the RocksDB options in the cluster */
 	private OptionsFactory optionsFactory;
-	
+
 	/** The options from the options factory, cached */
 	private transient DBOptions dbOptions;
 	private transient ColumnFamilyOptions columnOptions;
 
-	// ------------------------------------------------------------------------
-	//  Per operator values that are set in initializerForJob
-	// ------------------------------------------------------------------------
-
-	/** Path where this configured instance stores its data directory */
-	private transient File instanceBasePath;
-
-	/** Path where this configured instance stores its RocksDB data base */
-	private transient File instanceRocksDBPath;
-
-	/** Base path where this configured instance stores checkpoints */
-	private transient String instanceCheckpointPath;
-
-	/**
-	 * Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
-	 * to store state. The different k/v states that we have don't each have their own RocksDB
-	 * instance. They all write to this instance but to their own column family.
-	 */
-	protected volatile transient RocksDB db;
-
-	/**
-	 * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous
-	 * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try
-	 * iterating over a disposed db.
-	 */
-	private final SerializableObject dbCleanupLock = new SerializableObject();
+	/** Whether we already lazily initialized our local storage directories. */
+	private transient boolean isInitialized = false;
 
-	/**
-	 * Information about the k/v states as we create them. This is used to retrieve the
-	 * column family that is used for a state and also for sanity checks when restoring.
-	 */
-	private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation;
-
-	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the
 	 * file system and location defined by the given URI.
-	 * 
+	 *
 	 * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system
 	 * host and port in the URI, or have the Hadoop configuration that describes the file system
 	 * (host / high-availability group / possibly credentials) either referenced from the Flink
@@ -205,38 +135,42 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
 		// creating the FsStateBackend automatically sanity checks the URI
 		FsStateBackend fsStateBackend = new FsStateBackend(checkpointDataUri);
-		
-		this.nonPartitionedStateBackend = fsStateBackend;
-		this.checkpointDirectory = fsStateBackend.getBasePath();
+
+		this.checkpointStreamBackend = fsStateBackend;
 	}
 
 
 	public RocksDBStateBackend(String checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
 		this(new Path(checkpointDataUri).toUri(), nonPartitionedStateBackend);
 	}
-	
-	public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend nonPartitionedStateBackend) throws IOException {
-		this.nonPartitionedStateBackend = requireNonNull(nonPartitionedStateBackend);
-		this.checkpointDirectory = FsStateBackend.validateAndNormalizeUri(checkpointDataUri);
+
+	public RocksDBStateBackend(URI checkpointDataUri, AbstractStateBackend checkpointStreamBackend) throws IOException {
+		this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend);
+	}
+
+	private void writeObject(ObjectOutputStream oos) throws IOException {
+		oos.defaultWriteObject();
 	}
 
+	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
+		ois.defaultReadObject();
+		isInitialized = false;
+	}
 	// ------------------------------------------------------------------------
 	//  State backend methods
 	// ------------------------------------------------------------------------
-	
-	@Override
-	public void initializeForJob(
-			Environment env, 
-			String operatorIdentifier,
-			TypeSerializer<?> keySerializer) throws Exception {
-		
-		super.initializeForJob(env, operatorIdentifier, keySerializer);
 
-		this.nonPartitionedStateBackend.initializeForJob(env, operatorIdentifier, keySerializer);
-		
+	private void lazyInitializeForJob(
+			Environment env,
+			String operatorIdentifier) throws Exception {
+
+		if (isInitialized) {
+			return;
+		}
+
 		this.operatorIdentifier = operatorIdentifier.replace(" ", "");
 		this.jobId = env.getJobID();
-		
+
 		// initialize the paths where the local RocksDB files should be stored
 		if (configuredDbBasePaths == null) {
 			// initialize from the temp directories
@@ -245,7 +179,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		else {
 			List<File> dirs = new ArrayList<>(configuredDbBasePaths.length);
 			String errorMessage = "";
-			
+
 			for (Path path : configuredDbBasePaths) {
 				File f = new File(path.toUri().getPath());
 				File testDir = new File(f, UUID.randomUUID().toString());
@@ -259,672 +193,78 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 				}
 				testDir.delete();
 			}
-			
+
 			if (dirs.isEmpty()) {
 				throw new Exception("No local storage directories available. " + errorMessage);
 			} else {
 				initializedDbBasePaths = dirs.toArray(new File[dirs.size()]);
 			}
 		}
-		
-		nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
-
-		instanceBasePath = new File(getDbPath("dummy_state"), UUID.randomUUID().toString());
-		instanceCheckpointPath = getCheckpointPath("dummy_state");
-		instanceRocksDBPath = new File(instanceBasePath, "db");
-
-		RocksDB.loadLibrary();
-
-		if (!instanceBasePath.exists()) {
-			if (!instanceBasePath.mkdirs()) {
-				throw new RuntimeException("Could not create RocksDB data directory.");
-			}
-		}
-
-		// clean it, this will remove the last part of the path but RocksDB will recreate it
-		try {
-			if (instanceRocksDBPath.exists()) {
-				LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath);
-				FileUtils.deleteDirectory(instanceRocksDBPath);
-			}
-		} catch (IOException e) {
-			throw new RuntimeException("Error cleaning RocksDB data directory.", e);
-		}
-
-		List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
-		// RocksDB seems to need this...
-		columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
-		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
-		try {
-			db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
-		} catch (RocksDBException e) {
-			throw new RuntimeException("Error while opening RocksDB instance.", e);
-		}
-
-		kvStateInformation = new HashMap<>();
-	}
-
-	@Override
-	public void disposeAllStateForCurrentJob() throws Exception {
-		nonPartitionedStateBackend.disposeAllStateForCurrentJob();
-	}
-
-	@Override
-	public void discardState() throws Exception {
-		super.discardState();
-		nonPartitionedStateBackend.discardState();
-
-		// we have to lock because we might have an asynchronous checkpoint going on
-		synchronized (dbCleanupLock) {
-			if (db != null) {
-				if (this.dbOptions != null) {
-					this.dbOptions.dispose();
-					this.dbOptions = null;
-				}
-
-				for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
-					column.f0.dispose();
-				}
-
-				db.dispose();
-				db = null;
-			}
-		}
-	}
 
-	@Override
-	public void close() throws Exception {
-		nonPartitionedStateBackend.close();
-
-		// we have to lock because we might have an asynchronous checkpoint going on
-		synchronized (dbCleanupLock) {
-			if (db != null) {
-				if (this.dbOptions != null) {
-					this.dbOptions.dispose();
-					this.dbOptions = null;
-				}
-
-				for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) {
-					column.f0.dispose();
-				}
-
-				db.dispose();
-				db = null;
-			}
-		}
-	}
+		nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
 
-	private File getDbPath(String stateName) {
-		return new File(new File(new File(getNextStoragePath(), jobId.toString()), operatorIdentifier), stateName);
+		isInitialized = true;
 	}
 
-	private String getCheckpointPath(String stateName) {
-		return checkpointDirectory + "/" + jobId.toString() + "/" + operatorIdentifier + "/" + stateName;
+	private File getDbPath() {
+		return new File(new File(getNextStoragePath(), jobId.toString()), operatorIdentifier);
 	}
 
 	private File getNextStoragePath() {
 		int ni = nextDirectory + 1;
 		ni = ni >= initializedDbBasePaths.length ? 0 : ni;
 		nextDirectory = ni;
-		
-		return initializedDbBasePaths[ni];
-	}
 
-	/**
-	 * Visible for tests.
-	 */
-	public File[] getStoragePaths() {
-		return initializedDbBasePaths;
+		return initializedDbBasePaths[ni];
 	}
 
-	// ------------------------------------------------------------------------
-	//  Snapshot and restore
-	// ------------------------------------------------------------------------
-
 	@Override
-	public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
-		if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) {
+	public CheckpointStreamFactory createStreamFactory(JobID jobId,
+			String operatorIdentifier) throws IOException {
 			return null;
 		}
 
 		if (fullyAsyncBackup) {
 			return performFullyAsyncSnapshot(checkpointId, timestamp);
 		} else {
-			return performSemiAsyncSnapshot(checkpointId, timestamp);
-		}
-	}
-
-	/**
-	 * Performs a checkpoint by using the RocksDB backup feature to backup to a directory.
-	 * This backup is the asynchronously copied to the final checkpoint location.
-	 */
-	private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performSemiAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
-		// We don't snapshot individual k/v states since everything is stored in a central
-		// RocksDB data base. Create a dummy KvStateSnapshot that holds the information about
-		// that checkpoint. We use the in injectKeyValueStateSnapshots to restore.
-
-		final File localBackupPath = new File(instanceBasePath, "local-chk-" + checkpointId);
-		final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);
-
-		if (!localBackupPath.exists()) {
-			if (!localBackupPath.mkdirs()) {
-				throw new RuntimeException("Could not create local backup path " + localBackupPath);
-			}
-		}
-
-		long startTime = System.currentTimeMillis();
-
-		BackupableDBOptions backupOptions = new BackupableDBOptions(localBackupPath.getAbsolutePath());
-		// we disabled the WAL
-		backupOptions.setBackupLogFiles(false);
-		// no need to sync since we use the backup only as intermediate data before writing to FileSystem snapshot
-		backupOptions.setSync(false);
-
-		try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), backupOptions)) {
-			// wait before flush with "true"
-			backupEngine.createNewBackup(db, true);
-		}
-
-		long endTime = System.currentTimeMillis();
-		LOG.info("RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
-
-		// draw a copy in case it get's changed while performing the async snapshot
-		List<StateDescriptor> kvStateInformationCopy = new ArrayList<>();
-		for (Tuple2<ColumnFamilyHandle, StateDescriptor> state: kvStateInformation.values()) {
-			kvStateInformationCopy.add(state.f1);
-		}
-		SemiAsyncSnapshot dummySnapshot = new SemiAsyncSnapshot(localBackupPath,
-				backupUri,
-				kvStateInformationCopy,
-				checkpointId);
-
-
-		HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
-		result.put("dummy_state", dummySnapshot);
-		return result;
-	}
-
-	/**
-	 * Performs a checkpoint by drawing a {@link org.rocksdb.Snapshot} from RocksDB and then
-	 * iterating over all key/value pairs in RocksDB to store them in the final checkpoint
-	 * location. The only synchronous part is the drawing of the {@code Snapshot} which
-	 * is essentially free.
-	 */
-	private HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> performFullyAsyncSnapshot(long checkpointId, long timestamp) throws Exception {
-		// we draw a snapshot from RocksDB then iterate over all keys at that point
-		// and store them in the backup location
-
-		final URI backupUri = new URI(instanceCheckpointPath + "/chk-" + checkpointId);
-
-		long startTime = System.currentTimeMillis();
-
-		org.rocksdb.Snapshot snapshot = db.getSnapshot();
-
-		long endTime = System.currentTimeMillis();
-		LOG.info("Fully asynchronous RocksDB (" + instanceRocksDBPath + ") backup (synchronous part) took " + (endTime - startTime) + " ms.");
-
-		// draw a copy in case it get's changed while performing the async snapshot
-		Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamiliesCopy = new HashMap<>();
-		columnFamiliesCopy.putAll(kvStateInformation);
-		FullyAsyncSnapshot dummySnapshot = new FullyAsyncSnapshot(snapshot,
-				this,
-				backupUri,
-				columnFamiliesCopy,
-				checkpointId);
-
-
-		HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> result = new HashMap<>();
-		result.put("dummy_state", dummySnapshot);
-		return result;
+		return checkpointStreamBackend.createStreamFactory(jobId, operatorIdentifier);
 	}
 
 	@Override
-	public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
-		if (keyValueStateSnapshots == null) {
-			return;
-		}
-
-		KvStateSnapshot dummyState = keyValueStateSnapshots.get("dummy_state");
-		if (dummyState instanceof FinalSemiAsyncSnapshot) {
-			restoreFromSemiAsyncSnapshot((FinalSemiAsyncSnapshot) dummyState);
-		} else if (dummyState instanceof FinalFullyAsyncSnapshot) {
-			restoreFromFullyAsyncSnapshot((FinalFullyAsyncSnapshot) dummyState);
-		} else {
-			throw new RuntimeException("Unknown RocksDB snapshot: " + dummyState);
-		}
-	}
-
-	private void restoreFromSemiAsyncSnapshot(FinalSemiAsyncSnapshot snapshot) throws Exception {
-		// This does mostly the same work as initializeForJob, we remove the existing RocksDB
-		// directory and create a new one from the backup.
-		// This must be refactored. The StateBackend should either be initialized from
-		// scratch or from a snapshot.
-
-		if (!instanceBasePath.exists()) {
-			if (!instanceBasePath.mkdirs()) {
-				throw new RuntimeException("Could not create RocksDB data directory.");
-			}
-		}
-
-		db.dispose();
-
-		// clean it, this will remove the last part of the path but RocksDB will recreate it
-		try {
-			if (instanceRocksDBPath.exists()) {
-				LOG.warn("Deleting already existing db directory {}.", instanceRocksDBPath);
-				FileUtils.deleteDirectory(instanceRocksDBPath);
-			}
-		} catch (IOException e) {
-			throw new RuntimeException("Error cleaning RocksDB data directory.", e);
-		}
-
-		final File localBackupPath = new File(instanceBasePath, "chk-" + snapshot.checkpointId);
-
-		if (localBackupPath.exists()) {
-			try {
-				LOG.warn("Deleting already existing local backup directory {}.", localBackupPath);
-				FileUtils.deleteDirectory(localBackupPath);
-			} catch (IOException e) {
-				throw new RuntimeException("Error cleaning RocksDB local backup directory.", e);
-			}
-		}
-
-		HDFSCopyToLocal.copyToLocal(snapshot.backupUri, instanceBasePath);
-
-		try (BackupEngine backupEngine = BackupEngine.open(Env.getDefault(), new BackupableDBOptions(localBackupPath.getAbsolutePath()))) {
-			backupEngine.restoreDbFromLatestBackup(instanceRocksDBPath.getAbsolutePath(), instanceRocksDBPath.getAbsolutePath(), new RestoreOptions(true));
-		} catch (RocksDBException|IllegalArgumentException e) {
-			throw new RuntimeException("Error while restoring RocksDB state from " + localBackupPath, e);
-		} finally {
-			try {
-				FileUtils.deleteDirectory(localBackupPath);
-			} catch (IOException e) {
-				LOG.error("Error cleaning up local restore directory " + localBackupPath, e);
-			}
-		}
-
-
-		List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(snapshot.stateDescriptors.size());
-		for (StateDescriptor stateDescriptor: snapshot.stateDescriptors) {
-			columnFamilyDescriptors.add(new ColumnFamilyDescriptor(stateDescriptor.getName().getBytes(), getColumnOptions()));
-		}
-
-		// RocksDB seems to need this...
-		columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
-		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(snapshot.stateDescriptors.size());
-		try {
-
-			db = RocksDB.open(getDbOptions(), instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
-			this.kvStateInformation = new HashMap<>();
-			for (int i = 0; i < snapshot.stateDescriptors.size(); i++) {
-				this.kvStateInformation.put(snapshot.stateDescriptors.get(i).getName(), new Tuple2<>(columnFamilyHandles.get(i), snapshot.stateDescriptors.get(i)));
-			}
-
-		} catch (RocksDBException e) {
-			throw new RuntimeException("Error while opening RocksDB instance.", e);
-		}
-	}
-
-	private void restoreFromFullyAsyncSnapshot(FinalFullyAsyncSnapshot snapshot) throws Exception {
-
-		DataInputView inputView = new DataInputViewStreamWrapper(snapshot.stateHandle.openInputStream());
-
-		// clear k/v state information before filling it
-		kvStateInformation.clear();
-
-		// first get the column family mapping
-		int numColumns = inputView.readInt();
-		Map<Byte, StateDescriptor> columnFamilyMapping = new HashMap<>(numColumns);
-		for (int i = 0; i < numColumns; i++) {
-			byte mappingByte = inputView.readByte();
-
-			ObjectInputStream ooIn = new ObjectInputStream(new DataInputViewStream(inputView));
-			StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
-
-			columnFamilyMapping.put(mappingByte, stateDescriptor);
-
-			// this will fill in the k/v state information
-			getColumnFamily(stateDescriptor);
-		}
-
-		// try and read until EOF
-		try {
-			// the EOFException will get us out of this...
-			while (true) {
-				byte mappingByte = inputView.readByte();
-				ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte));
-				byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
-				byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
-				db.put(handle, key, value);
-			}
-		} catch (EOFException e) {
-			// expected
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Semi-asynchronous Backup Classes
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Upon snapshotting the RocksDB backup is created synchronously. The asynchronous part is
-	 * copying the backup to a (possibly) remote filesystem. This is done in {@link #materialize()}.
-	 */
-	private static class SemiAsyncSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
-		private static final long serialVersionUID = 1L;
-		private final File localBackupPath;
-		private final URI backupUri;
-		private final List<StateDescriptor> stateDescriptors;
-		private final long checkpointId;
-
-		private SemiAsyncSnapshot(File localBackupPath,
-				URI backupUri,
-				List<StateDescriptor> columnFamilies,
-				long checkpointId) {
-			this.localBackupPath = localBackupPath;
-			this.backupUri = backupUri;
-			this.stateDescriptors = columnFamilies;
-			this.checkpointId = checkpointId;
-		}
-
-		@Override
-		public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
-			try {
-				long startTime = System.currentTimeMillis();
-				HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
-				long endTime = System.currentTimeMillis();
-				LOG.info("RocksDB materialization from " + localBackupPath + " to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
-				return new FinalSemiAsyncSnapshot(backupUri, checkpointId, stateDescriptors);
-			} catch (Exception e) {
-				FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
-				fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
-				throw e;
-			} finally {
-				FileUtils.deleteQuietly(localBackupPath);
-			}
-		}
-	}
-
-	/**
-	 * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base. This
-	 * also stores the column families that we had at the time of the snapshot so that we can
-	 * restore these. This results from {@link SemiAsyncSnapshot}.
-	 */
-	private static class FinalSemiAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
-		private static final long serialVersionUID = 1L;
-
-		final URI backupUri;
-		final long checkpointId;
-		private final List<StateDescriptor> stateDescriptors;
-
-		/**
-		 * Creates a new snapshot from the given state parameters.
-		 */
-		private FinalSemiAsyncSnapshot(URI backupUri, long checkpointId, List<StateDescriptor> stateDescriptors) {
-			this.backupUri = backupUri;
-			this.checkpointId = checkpointId;
-			this.stateDescriptors = stateDescriptors;
-		}
-
-		@Override
-		public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(
-				RocksDBStateBackend stateBackend,
-				TypeSerializer<Object> keySerializer,
-				ClassLoader classLoader) throws Exception {
-			throw new RuntimeException("Should never happen.");
-		}
-
-		@Override
-		public final void discardState() throws Exception {
-			FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
-			fs.delete(new org.apache.hadoop.fs.Path(backupUri), true);
-		}
-
-		@Override
-		public final long getStateSize() throws Exception {
-			FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
-			return fs.getContentSummary(new org.apache.hadoop.fs.Path(backupUri)).getLength();
-		}
-
-		@Override
-		public void close() throws IOException {
-			// cannot do much here
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Fully asynchronous Backup Classes
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This does the snapshot using a RocksDB snapshot and an iterator over all keys
-	 * at the point of that snapshot.
-	 */
-	private class FullyAsyncSnapshot extends AsynchronousKvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
-		private static final long serialVersionUID = 1L;
-
-		private transient org.rocksdb.Snapshot snapshot;
-		private transient AbstractStateBackend backend;
-
-		private final URI backupUri;
-		private final Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies;
-		private final long checkpointId;
-
-		private FullyAsyncSnapshot(org.rocksdb.Snapshot snapshot,
-				AbstractStateBackend backend,
-				URI backupUri,
-				Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> columnFamilies,
-				long checkpointId) {
-			this.snapshot = snapshot;
-			this.backend = backend;
-			this.backupUri = backupUri;
-			this.columnFamilies = columnFamilies;
-			this.checkpointId = checkpointId;
-		}
-
-		@Override
-		public KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> materialize() throws Exception {
-			try {
-				long startTime = System.currentTimeMillis();
-
-				CheckpointStateOutputStream outputStream = backend.createCheckpointStateOutputStream(checkpointId, startTime);
-				DataOutputView outputView = new DataOutputViewStreamWrapper(outputStream);
-				outputView.writeInt(columnFamilies.size());
-
-				// we don't know how many key/value pairs there are in each column family.
-				// We prefix every written element with a byte that signifies to which
-				// column family it belongs, this way we can restore the column families
-				byte count = 0;
-				Map<String, Byte> columnFamilyMapping = new HashMap<>();
-				for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
-					columnFamilyMapping.put(column.getKey(), count);
-
-					outputView.writeByte(count);
-
-					ObjectOutputStream ooOut = new ObjectOutputStream(outputStream);
-					ooOut.writeObject(column.getValue().f1);
-					ooOut.flush();
-
-					count++;
-				}
-
-				ReadOptions readOptions = new ReadOptions();
-				readOptions.setSnapshot(snapshot);
-
-				for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column: columnFamilies.entrySet()) {
-					byte columnByte = columnFamilyMapping.get(column.getKey());
-
-					synchronized (dbCleanupLock) {
-						if (db == null) {
-							throw new RuntimeException("RocksDB instance was disposed. This happens " +
-									"when we are in the middle of a checkpoint and the job fails.");
-						}
-						RocksIterator iterator = db.newIterator(column.getValue().f0, readOptions);
-						iterator.seekToFirst();
-						while (iterator.isValid()) {
-							outputView.writeByte(columnByte);
-							BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.key(),
-									outputView);
-							BytePrimitiveArraySerializer.INSTANCE.serialize(iterator.value(),
-									outputView);
-							iterator.next();
-						}
-					}
-				}
-
-				StreamStateHandle stateHandle = outputStream.closeAndGetHandle();
-
-				long endTime = System.currentTimeMillis();
-				LOG.info("Fully asynchronous RocksDB materialization to " + backupUri + " (asynchronous part) took " + (endTime - startTime) + " ms.");
-				return new FinalFullyAsyncSnapshot(stateHandle, checkpointId);
-			} finally {
-				synchronized (dbCleanupLock) {
-					if (db != null) {
-						db.releaseSnapshot(snapshot);
-					}
-				}
-				snapshot = null;
-			}
-		}
-
-	}
-
-	/**
-	 * Dummy {@link KvStateSnapshot} that holds the state of our one RocksDB data base. This
-	 * results from {@link FullyAsyncSnapshot}.
-	 */
-	private static class FinalFullyAsyncSnapshot implements KvStateSnapshot<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> {
-		private static final long serialVersionUID = 1L;
-
-		final StreamStateHandle stateHandle;
-		final long checkpointId;
-
-		/**
-		 * Creates a new snapshot from the given state parameters.
-		 */
-		private FinalFullyAsyncSnapshot(StreamStateHandle stateHandle, long checkpointId) {
-			this.stateHandle = stateHandle;
-			this.checkpointId = checkpointId;
-		}
-
-		@Override
-		public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(
-				RocksDBStateBackend stateBackend,
-				TypeSerializer<Object> keySerializer,
-				ClassLoader classLoader) throws Exception {
-			throw new RuntimeException("Should never happen.");
-		}
-
-		@Override
-		public final void discardState() throws Exception {
-			stateHandle.discardState();
-		}
-
-		@Override
-		public final long getStateSize() throws Exception {
-			return stateHandle.getStateSize();
-		}
-
-		@Override
-		public void close() throws IOException {
-			stateHandle.close();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  State factories
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a column family handle for use with a k/v state. When restoring from a snapshot
-	 * we don't restore the individual k/v states, just the global RocksDB data base and the
-	 * list of column families. When a k/v state is first requested we check here whether we
-	 * already have a column family for that and return it or create a new one if it doesn't exist.
-	 *
-	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
-	 * that we checkpointed, i.e. is already in the map of column families.
-	 */
-	protected ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor)  {
-
-		Tuple2<ColumnFamilyHandle, StateDescriptor> stateInfo = kvStateInformation.get(descriptor.getName());
-
-		if (stateInfo != null) {
-			if (!stateInfo.f1.equals(descriptor)) {
-				throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 + " trying access with " + descriptor);
-			}
-			return stateInfo.f0;
-		}
-
-		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(descriptor.getName().getBytes(), getColumnOptions());
-
-		try {
-			ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
-			kvStateInformation.put(descriptor.getName(), new Tuple2<>(columnFamily, descriptor));
-			return columnFamily;
-		} catch (RocksDBException e) {
-			throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
-		}
-	}
-
-	/**
-	 * Used by k/v states to access the current key.
-	 */
-	public Object currentKey() {
-		return currentKey;
-	}
-
-	/**
-	 * Used by k/v states to access the key serializer.
-	 */
-	public TypeSerializer keySerializer() {
-		return keySerializer;
-	}
-
-	@Override
-	protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
-			ValueStateDescriptor<T> stateDesc) throws Exception {
-
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
-
-		return new RocksDBValueState<>(columnFamily, namespaceSerializer,  stateDesc, this);
-	}
-
-	@Override
-	protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
-			ListStateDescriptor<T> stateDesc) throws Exception {
-
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
-
-		return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this);
-	}
-
-	@Override
-	protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
-			ReducingStateDescriptor<T> stateDesc) throws Exception {
-
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
-
-		return new RocksDBReducingState<>(columnFamily, namespaceSerializer,  stateDesc, this);
-	}
+	public <K> KeyedStateBackend<K> createKeyedStateBackend(
+			Environment env,
+			JobID jobID,
+			String operatorIdentifier,
+			TypeSerializer<K> keySerializer,
+			KeyGroupAssigner<K> keyGroupAssigner,
+			KeyGroupRange keyGroupRange,
+			TaskKvStateRegistry kvStateRegistry) throws Exception {
 
-	@Override
-	protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
-			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+		lazyInitializeForJob(env, operatorIdentifier);
 
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+		File instanceBasePath = new File(getDbPath(), UUID.randomUUID().toString());
 
-		return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
+		return new RocksDBKeyedStateBackend<>(
+				jobID,
+				operatorIdentifier,
+				instanceBasePath,
+				getDbOptions(),
+				getColumnOptions(),
+				kvStateRegistry,
+				keySerializer,
+				keyGroupAssigner,
+				keyGroupRange);
 	}
 
-	// ------------------------------------------------------------------------
-	//  Non-partitioned state
-	// ------------------------------------------------------------------------
-
 	@Override
-	public CheckpointStateOutputStream createCheckpointStateOutputStream(
-			long checkpointID, long timestamp) throws Exception {
-		
-		return nonPartitionedStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);
+	public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, JobID jobID,
+			String operatorIdentifier,
+			TypeSerializer<K> keySerializer,
+			KeyGroupAssigner<K> keyGroupAssigner,
+            KeyGroupRange keyGroupRange,
+			List<KeyGroupsStateHandle> restoredState,
+			TaskKvStateRegistry kvStateRegistry) throws Exception {
+		throw new RuntimeException("Not implemented.");
 	}
 
 	// ------------------------------------------------------------------------
@@ -932,35 +272,13 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Enables fully asynchronous snapshotting of the partitioned state held in RocksDB.
-	 *
-	 * <p>By default, this is disabled. This means that RocksDB state is copied in a synchronous
-	 * step, during which normal processing of elements pauses, followed by an asynchronous step
-	 * of copying the RocksDB backup to the final checkpoint location. Fully asynchronous
-	 * snapshots take longer (linear time requirement with respect to number of unique keys)
-	 * but normal processing of elements is not paused.
-	 */
-	public void enableFullyAsyncSnapshots() {
-		this.fullyAsyncBackup = true;
-	}
-
-	/**
-	 * Disables fully asynchronous snapshotting of the partitioned state held in RocksDB.
-	 *
-	 * <p>By default, this is disabled.
-	 */
-	public void disableFullyAsyncSnapshots() {
-		this.fullyAsyncBackup = false;
-	}
-
-	/**
 	 * Sets the path where the RocksDB local database files should be stored on the local
 	 * file system. Setting this path overrides the default behavior, where the
 	 * files are stored across the configured temp directories.
-	 * 
+	 *
 	 * <p>Passing {@code null} to this function restores the default behavior, where the configured
 	 * temp directories will be used.
-	 * 
+	 *
 	 * @param path The path where the local RocksDB database files are stored.
 	 */
 	public void setDbStoragePath(String path) {
@@ -971,44 +289,44 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * Sets the paths across which the local RocksDB database files are distributed on the local
 	 * file system. Setting these paths overrides the default behavior, where the
 	 * files are stored across the configured temp directories.
-	 * 
+	 *
 	 * <p>Each distinct state will be stored in one path, but when the state backend creates
 	 * multiple states, they will store their files on different paths.
-	 * 
+	 *
 	 * <p>Passing {@code null} to this function restores the default behavior, where the configured
 	 * temp directories will be used.
-	 * 
-	 * @param paths The paths across which the local RocksDB database files will be spread. 
+	 *
+	 * @param paths The paths across which the local RocksDB database files will be spread.
 	 */
 	public void setDbStoragePaths(String... paths) {
 		if (paths == null) {
 			configuredDbBasePaths = null;
-		} 
+		}
 		else if (paths.length == 0) {
 			throw new IllegalArgumentException("empty paths");
 		}
 		else {
 			Path[] pp = new Path[paths.length];
-			
+
 			for (int i = 0; i < paths.length; i++) {
 				if (paths[i] == null) {
 					throw new IllegalArgumentException("null path");
 				}
-				
+
 				pp[i] = new Path(paths[i]);
 				String scheme = pp[i].toUri().getScheme();
 				if (scheme != null && !scheme.equalsIgnoreCase("file")) {
 					throw new IllegalArgumentException("Path " + paths[i] + " has a non local scheme");
 				}
 			}
-			
+
 			configuredDbBasePaths = pp;
 		}
 	}
 
 	/**
-	 * 
-	 * @return The configured DB storage paths, or null, if none were configured. 
+	 *
+	 * @return The configured DB storage paths, or null, if none were configured.
 	 */
 	public String[] getDbStoragePaths() {
 		if (configuredDbBasePaths == null) {
@@ -1021,18 +339,18 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			return paths;
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Parametrize with RocksDB Options
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Sets the predefined options for RocksDB.
-	 * 
+	 *
 	 * <p>If a user-defined options factory is set (via {@link #setOptions(OptionsFactory)}),
 	 * then the options from the factory are applied on top of the here specified
 	 * predefined options.
-	 * 
+	 *
 	 * @param options The options to set (must not be null).
 	 */
 	public void setPredefinedOptions(PredefinedOptions options) {
@@ -1043,10 +361,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * Gets the currently set predefined options for RocksDB.
 	 * The default options (if nothing was set via {@link #setPredefinedOptions(PredefinedOptions)})
 	 * are {@link PredefinedOptions#DEFAULT}.
-	 * 
+	 *
 	 * <p>If a user-defined  options factory is set (via {@link #setOptions(OptionsFactory)}),
 	 * then the options from the factory are applied on top of the predefined options.
-	 * 
+	 *
 	 * @return The currently set predefined options for RocksDB.
 	 */
 	public PredefinedOptions getPredefinedOptions() {
@@ -1057,13 +375,13 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * Sets {@link org.rocksdb.Options} for the RocksDB instances.
 	 * Because the options are not serializable and hold native code references,
 	 * they must be specified through a factory.
-	 * 
-	 * <p>The options created by the factory here are applied on top of the pre-defined 
+	 *
+	 * <p>The options created by the factory here are applied on top of the pre-defined
 	 * options profile selected via {@link #setPredefinedOptions(PredefinedOptions)}.
 	 * If the pre-defined options profile is the default
 	 * ({@link PredefinedOptions#DEFAULT}), then the factory fully controls the RocksDB
 	 * options.
-	 * 
+	 *
 	 * @param optionsFactory The options factory that lazily creates the RocksDB options.
 	 */
 	public void setOptions(OptionsFactory optionsFactory) {
@@ -1072,7 +390,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 
 	/**
 	 * Gets the options factory that lazily creates the RocksDB options.
-	 * 
+	 *
 	 * @return The options factory.
 	 */
 	public OptionsFactory getOptions() {
@@ -1091,7 +409,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			if (optionsFactory != null) {
 				opt = optionsFactory.createDBOptions(opt);
 			}
-			
+
 			// add necessary default options
 			opt = opt.setCreateIfMissing(true);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 62bc366..b9c0e83 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -63,7 +63,7 @@ public class RocksDBValueState<K, N, V>
 	public RocksDBValueState(ColumnFamilyHandle columnFamily,
 			TypeSerializer<N> namespaceSerializer,
 			ValueStateDescriptor<V> stateDesc,
-			RocksDBStateBackend backend) {
+			RocksDBKeyedStateBackend backend) {
 
 		super(columnFamily, namespaceSerializer, stateDesc, backend);
 		this.valueSerializer = stateDesc.getSerializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/FullyAsyncRocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/FullyAsyncRocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/FullyAsyncRocksDBStateBackendTest.java
deleted file mode 100644
index 7861542..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/FullyAsyncRocksDBStateBackendTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.state.StateBackendTestBase;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.util.OperatingSystem;
-import org.junit.Assume;
-import org.junit.Before;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.UUID;
-
-/**
- * Tests for the partitioned state part of {@link RocksDBStateBackend} with fully asynchronous
- * checkpointing enabled.
- */
-public class FullyAsyncRocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBackend> {
-
-	private File dbDir;
-	private File chkDir;
-
-	@Before
-	public void checkOperatingSystem() {
-		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
-	}
-
-	@Override
-	protected RocksDBStateBackend getStateBackend() throws IOException {
-		dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
-		chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
-
-		RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
-		backend.setDbStoragePath(dbDir.getAbsolutePath());
-		backend.enableFullyAsyncSnapshots();
-		return backend;
-	}
-
-	@Override
-	protected void cleanup() {
-		try {
-			FileUtils.deleteDirectory(dbDir);
-			FileUtils.deleteDirectory(chkDir);
-		} catch (IOException ignore) {}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
index d720c6d..0e35b60 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncKVSnapshotTest.java
@@ -88,7 +88,6 @@ public class RocksDBAsyncKVSnapshotTest {
 	 * test will simply lock forever.
 	 */
 	@Test
-	@Ignore
 	public void testAsyncCheckpoints() throws Exception {
 		LocalFileSystem localFS = new LocalFileSystem();
 		localFS.initialize(new URI("file:///"), new Configuration());
@@ -191,7 +190,6 @@ public class RocksDBAsyncKVSnapshotTest {
 	 * test will simply lock forever.
 	 */
 	@Test
-	@Ignore
 	public void testFullyAsyncCheckpoints() throws Exception {
 		LocalFileSystem localFS = new LocalFileSystem();
 		localFS.initialize(new URI("file:///"), new Configuration());
@@ -218,7 +216,7 @@ public class RocksDBAsyncKVSnapshotTest {
 
 		RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
 		backend.setDbStoragePath(dbDir.getAbsolutePath());
-		backend.enableFullyAsyncSnapshots();
+//		backend.enableFullyAsyncSnapshots();
 
 		streamConfig.setStateBackend(backend);