You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 21:06:30 UTC

[1/8] flink git commit: [hotfix] Harden RocksDBAsyncSnapshotTest#testCancelFullyAsyncCheckpoints

Repository: flink
Updated Branches:
  refs/heads/master 0bbc91eb1 -> 7a91f3071


[hotfix] Harden RocksDBAsyncSnapshotTest#testCancelFullyAsyncCheckpoints

Depending on RocksDBOptions#TIMER_SERVICE_IMPL we have to adapt the testCancelFullyAsyncCheckpoints wrt
how many checkpointing streams we skip.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a91f307
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a91f307
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a91f307

Branch: refs/heads/master
Commit: 7a91f30711473d670173107d7517c9b5acaa9d3f
Parents: 6bba0e7
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 16:55:49 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBAsyncSnapshotTest.java | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a91f307/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index fe1a625..a8e5a36 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -256,16 +256,28 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 
 		File dbDir = temporaryFolder.newFolder();
 
+		final RocksDBStateBackend.PriorityQueueStateType timerServicePriorityQueueType = RocksDBStateBackend.PriorityQueueStateType.valueOf(RocksDBOptions.TIMER_SERVICE_IMPL.defaultValue());
+
+		final int skipStreams;
+
+		if (timerServicePriorityQueueType == RocksDBStateBackend.PriorityQueueStateType.HEAP) {
+			// we skip the first created stream, because it is used to checkpoint the timer service, which is
+			// currently not asynchronous.
+			skipStreams = 1;
+		} else if (timerServicePriorityQueueType == RocksDBStateBackend.PriorityQueueStateType.ROCKSDB) {
+			skipStreams = 0;
+		} else {
+			throw new AssertionError(String.format("Unknown timer service priority queue type %s.", timerServicePriorityQueueType));
+		}
+
 		// this is the proper instance that we need to call.
 		BlockerCheckpointStreamFactory blockerCheckpointStreamFactory =
 			new BlockerCheckpointStreamFactory(4 * 1024 * 1024) {
 
-			int count = 1;
+			int count = skipStreams;
 
 			@Override
 			public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws IOException {
-				// we skip the first created stream, because it is used to checkpoint the timer service, which is
-				// currently not asynchronous.
 				if (count > 0) {
 					--count;
 					return new BlockingCheckpointOutputStream(


[8/8] flink git commit: [hotfix] Consolidate RocksDB configuration options in RocksDBOptions

Posted by tr...@apache.org.
[hotfix] Consolidate RocksDB configuration options in RocksDBOptions

Rename from backend.rocksdb.priority_queue_state_type into state.backend.rocksdb.timer-service.impl


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4b4cb70
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4b4cb70
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4b4cb70

Branch: refs/heads/master
Commit: a4b4cb70a5a26acd31294041c8a15479e69ef3bc
Parents: a88d6ef
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 09:29:37 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200

----------------------------------------------------------------------
 .../generated/checkpointing_configuration.html  |  5 ---
 .../generated/rocks_db_configuration.html       | 21 +++++++++
 .../configuration/CheckpointingOptions.java     | 11 -----
 flink-docs/pom.xml                              |  5 +++
 .../ConfigOptionsDocGenerator.java              |  3 +-
 .../streaming/state/RockDBBackendOptions.java   | 38 ----------------
 .../contrib/streaming/state/RocksDBOptions.java | 47 ++++++++++++++++++++
 .../streaming/state/RocksDBStateBackend.java    |  6 +--
 .../state/RocksDBStateBackendFactoryTest.java   |  6 +--
 9 files changed, 81 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/docs/_includes/generated/checkpointing_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/checkpointing_configuration.html b/docs/_includes/generated/checkpointing_configuration.html
index 7a2791f..8f5ce7b 100644
--- a/docs/_includes/generated/checkpointing_configuration.html
+++ b/docs/_includes/generated/checkpointing_configuration.html
@@ -33,11 +33,6 @@
             <td></td>
         </tr>
         <tr>
-            <td><h5>state.backend.rocksdb.localdir</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
-        </tr>
-        <tr>
             <td><h5>state.checkpoints.dir</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).</td>

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/docs/_includes/generated/rocks_db_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rocks_db_configuration.html b/docs/_includes/generated/rocks_db_configuration.html
new file mode 100644
index 0000000..57b9511
--- /dev/null
+++ b/docs/_includes/generated/rocks_db_configuration.html
@@ -0,0 +1,21 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>state.backend.rocksdb.localdir</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.timer-service.impl</h5></td>
+            <td style="word-wrap: break-word;">"HEAP"</td>
+            <td>This determines the timer service implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index 60b7613..6557a9f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -115,15 +115,4 @@ public class CheckpointingOptions {
 			.defaultValue(1024)
 			.withDescription("The minimum size of state data files. All state chunks smaller than that are stored" +
 				" inline in the root checkpoint metadata file.");
-
-	// ------------------------------------------------------------------------
-	//  Options specific to the RocksDB state backend
-	// ------------------------------------------------------------------------
-
-	/** The local directory (on the TaskManager) where RocksDB puts its files. */
-	public static final ConfigOption<String> ROCKSDB_LOCAL_DIRECTORIES = ConfigOptions
-			.key("state.backend.rocksdb.localdir")
-			.noDefaultValue()
-			.withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
-			.withDescription("The local directory (on the TaskManager) where RocksDB puts its files.");
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-docs/pom.xml
----------------------------------------------------------------------
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index 2135dea..b7709f5 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -83,6 +83,11 @@ under the License.
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 
 		<dependency>
 			<groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 953122f..d333719 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -59,7 +59,8 @@ public class ConfigOptionsDocGenerator {
 		new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
-		new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus")
+		new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus"),
+		new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state")
 	};
 
 	static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
deleted file mode 100644
index ede45e3..0000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-/**
- * Configuration options for the RocksDB backend.
- */
-public class RockDBBackendOptions {
-
-	/**
-	 * Choice of implementation for priority queue state (e.g. timers).
-	 */
-	public static final ConfigOption<String> PRIORITY_QUEUE_STATE_TYPE = ConfigOptions
-		.key("backend.rocksdb.priority_queue_state_type")
-		.defaultValue(RocksDBStateBackend.PriorityQueueStateType.HEAP.name())
-		.withDescription("This determines the implementation for the priority queue state (e.g. timers). Options are" +
-			"either " + RocksDBStateBackend.PriorityQueueStateType.HEAP.name() + " (heap-based, default) or " +
-			RocksDBStateBackend.PriorityQueueStateType.ROCKS.name() + " for in implementation based on RocksDB.");
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
new file mode 100644
index 0000000..37eb6cf
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.HEAP;
+import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.ROCKS;
+
+/**
+ * Configuration options for the RocksDB backend.
+ */
+public class RocksDBOptions {
+
+	/** The local directory (on the TaskManager) where RocksDB puts its files. */
+	public static final ConfigOption<String> LOCAL_DIRECTORIES = ConfigOptions
+		.key("state.backend.rocksdb.localdir")
+		.noDefaultValue()
+		.withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
+		.withDescription("The local directory (on the TaskManager) where RocksDB puts its files.");
+
+	/**
+	 * Choice of timer service implementation.
+	 */
+	public static final ConfigOption<String> TIMER_SERVICE_IMPL = ConfigOptions
+		.key("state.backend.rocksdb.timer-service.impl")
+		.defaultValue(HEAP.name())
+		.withDescription(String.format("This determines the timer service implementation. Options are either %s " +
+			"(heap-based, default) or %s for an implementation based on RocksDB.", HEAP.name(), ROCKS.name()));
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 1794e17..58e8de6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -60,7 +60,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
-import static org.apache.flink.contrib.streaming.state.RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE;
+import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_IMPL;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -271,7 +271,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 		this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(
 			config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
 
-		final String priorityQueueTypeString = config.getString(PRIORITY_QUEUE_STATE_TYPE.key(), "");
+		final String priorityQueueTypeString = config.getString(TIMER_SERVICE_IMPL.key(), "");
 
 		this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
 			PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
@@ -281,7 +281,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 			this.localRocksDbDirectories = original.localRocksDbDirectories;
 		}
 		else {
-			final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
+			final String rocksdbLocalPaths = config.getString(RocksDBOptions.LOCAL_DIRECTORIES);
 			if (rocksdbLocalPaths != null) {
 				String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4b4cb70/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
index 7612c4c..9e16dea 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
@@ -82,14 +82,14 @@ public class RocksDBStateBackendFactoryTest {
 		config1.setString(backendKey, "rocksdb");
 		config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
 		config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
-		config1.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+		config1.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDirs);
 		config1.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
 
 		final Configuration config2 = new Configuration();
 		config2.setString(backendKey, RocksDBStateBackendFactory.class.getName());
 		config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
 		config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
-		config2.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+		config2.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDirs);
 		config2.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
 
 		StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
@@ -143,7 +143,7 @@ public class RocksDBStateBackendFactoryTest {
 		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
 		config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, !incremental);  // this should not be picked up
-		config.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDir3 + ":" + localDir4);  // this should not be picked up
+		config.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDir3 + ":" + localDir4);  // this should not be picked up
 
 		final StateBackend loadedBackend =
 				StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);


[7/8] flink git commit: [hotfix] Use default value of RocksDBOptions#TIMER_SERVICE_IMPL

Posted by tr...@apache.org.
[hotfix] Use default value of RocksDBOptions#TIMER_SERVICE_IMPL

In order to fully enable the RocksDBOptions#TIMER_SERVICE_IMPL we should use the default value
of this config option instead of "".


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bba0e77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bba0e77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bba0e77

Branch: refs/heads/master
Commit: 6bba0e77ed0d98b8de24f457605b82c69980a0d4
Parents: 8db5ca6
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 15:07:31 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/contrib/streaming/state/RocksDBStateBackend.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bba0e77/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index c6b50cd..0564849 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -271,7 +271,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 		this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(
 			config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
 
-		final String priorityQueueTypeString = config.getString(TIMER_SERVICE_IMPL.key(), "");
+		final String priorityQueueTypeString = config.getString(TIMER_SERVICE_IMPL);
 
 		this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
 			PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;


[5/8] flink git commit: [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state

Posted by tr...@apache.org.
[FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state

Optimization for relaxed bulk polls

Deactivate optimization for now because it still contains a bug

This closes #6333.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbddf00b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbddf00b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbddf00b

Branch: refs/heads/master
Commit: dbddf00b75032c20df6e7aef26814da392347194
Parents: 0bbc91e
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Jun 13 11:56:16 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200

----------------------------------------------------------------------
 .../state/AbstractKeyedStateBackend.java        |   5 +
 .../state/BackendWritableBroadcastState.java    |   4 +-
 .../state/DefaultOperatorStateBackend.java      |  32 +--
 .../flink/runtime/state/HeapBroadcastState.java |  10 +-
 .../runtime/state/KeyExtractorFunction.java     |  13 ++
 .../runtime/state/KeyGroupPartitioner.java      |  72 +++++-
 .../org/apache/flink/runtime/state/Keyed.java   |  32 +++
 .../flink/runtime/state/PriorityComparable.java |  33 +++
 .../flink/runtime/state/PriorityComparator.java |   7 +
 .../runtime/state/PriorityQueueSetFactory.java  |   6 +-
 ...RegisteredBroadcastBackendStateMetaInfo.java | 155 -------------
 ...RegisteredBroadcastStateBackendMetaInfo.java | 169 ++++++++++++++
 .../RegisteredKeyValueStateBackendMetaInfo.java | 224 +++++++++++++++++++
 .../RegisteredKeyedBackendStateMetaInfo.java    | 212 ------------------
 .../RegisteredOperatorBackendStateMetaInfo.java | 142 ------------
 .../RegisteredOperatorStateBackendMetaInfo.java | 153 +++++++++++++
 ...steredPriorityQueueStateBackendMetaInfo.java |  87 +++++++
 .../state/RegisteredStateMetaInfoBase.java      |  17 ++
 .../flink/runtime/state/StateSnapshot.java      |  27 ++-
 .../state/StateSnapshotKeyGroupReader.java      |  44 ++++
 .../runtime/state/StateSnapshotRestore.java     |  47 ++++
 .../state/TieBreakingPriorityComparator.java    |   5 -
 .../heap/CachingInternalPriorityQueueSet.java   |  46 ++++
 .../state/heap/CopyOnWriteStateTable.java       |  13 +-
 .../heap/CopyOnWriteStateTableSnapshot.java     |  12 +-
 .../state/heap/HeapKeyedStateBackend.java       | 163 +++++++++-----
 .../runtime/state/heap/HeapPriorityQueue.java   |   3 +-
 .../state/heap/HeapPriorityQueueSetFactory.java |  18 +-
 ...HeapPriorityQueueSnapshotRestoreWrapper.java | 102 +++++++++
 .../heap/HeapPriorityQueueStateSnapshot.java    | 118 ++++++++++
 .../heap/KeyGroupPartitionedPriorityQueue.java  |  18 ++
 .../state/heap/NestedMapsStateTable.java        |  38 ++--
 .../flink/runtime/state/heap/StateTable.java    |  25 ++-
 .../state/heap/StateTableByKeyGroupReader.java  |  38 ----
 .../state/heap/StateTableByKeyGroupReaders.java |  89 +++-----
 .../state/metainfo/StateMetaInfoSnapshot.java   |   5 +-
 .../StateMetaInfoSnapshotReadersWriters.java    |  38 +++-
 .../state/KeyGroupPartitionerTestBase.java      |   4 +-
 .../runtime/state/SerializationProxiesTest.java |  54 ++---
 .../state/StateSnapshotCompressionTest.java     |   9 +-
 .../state/heap/CopyOnWriteStateTableTest.java   |  32 +--
 .../StateTableSnapshotCompatibilityTest.java    |  17 +-
 .../StateMetaInfoSnapshotEnumConstantsTest.java |   4 +-
 .../state/ttl/mock/MockKeyedStateBackend.java   |  19 +-
 .../state/RocksDBAggregatingState.java          |   4 +-
 .../streaming/state/RocksDBFoldingState.java    |   4 +-
 .../state/RocksDBKeyedStateBackend.java         | 188 +++++++---------
 .../streaming/state/RocksDBListState.java       |   4 +-
 .../streaming/state/RocksDBMapState.java        |   4 +-
 .../streaming/state/RocksDBOrderedSetStore.java |   1 -
 .../streaming/state/RocksDBReducingState.java   |   4 +-
 .../streaming/state/RocksDBValueState.java      |   4 +-
 .../state/RocksDBWriteBatchWrapper.java         |   4 +
 .../api/operators/AbstractStreamOperator.java   |   6 +-
 .../HeapPriorityQueueStateSnapshot.java         | 112 ----------
 .../operators/InternalTimeServiceManager.java   |  31 ++-
 .../streaming/api/operators/InternalTimer.java  |  15 +-
 .../InternalTimersSnapshotReaderWriters.java    | 126 ++++++++++-
 .../StreamTaskStateInitializerImpl.java         |   3 +-
 .../api/operators/TimerHeapInternalTimer.java   | 125 +----------
 .../api/operators/TimerSerializer.java          |  31 ++-
 .../operators/HeapInternalTimerServiceTest.java |   4 +-
 62 files changed, 1807 insertions(+), 1224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index c7f1bd9..17d24f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -315,4 +315,9 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	@VisibleForTesting
 	public abstract int numStateEntries();
 
+	// TODO remove this once heap-based timers are working with RocksDB incremental snapshots!
+	public boolean requiresLegacySynchronousTimerSnapshots() {
+		return false;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
index 8daf07c..ba3985b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
@@ -36,7 +36,7 @@ public interface BackendWritableBroadcastState<K, V> extends BroadcastState<K, V
 
 	long write(FSDataOutputStream out) throws IOException;
 
-	void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo);
+	void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo);
 
-	RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo();
+	RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index dc9b75f..f1d0b57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -209,7 +209,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 		if (broadcastState == null) {
 			broadcastState = new HeapBroadcastState<>(
-					new RegisteredBroadcastBackendStateMetaInfo<>(
+					new RegisteredBroadcastStateBackendMetaInfo<>(
 							name,
 							OperatorStateHandle.Mode.BROADCAST,
 							broadcastStateKeySerializer,
@@ -227,7 +227,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			final StateMetaInfoSnapshot metaInfoSnapshot = restoredBroadcastStateMetaInfos.get(name);
 
 			@SuppressWarnings("unchecked")
-			RegisteredBroadcastBackendStateMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastBackendStateMetaInfo<K, V>(metaInfoSnapshot);
+			RegisteredBroadcastStateBackendMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastStateBackendMetaInfo<K, V>(metaInfoSnapshot);
 
 			// check compatibility to determine if state migration is required
 			CompatibilityResult<K> keyCompatibility = CompatibilityUtil.resolveCompatibilityResult(
@@ -247,7 +247,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			if (!keyCompatibility.isRequiresMigration() && !valueCompatibility.isRequiresMigration()) {
 				// new serializer is compatible; use it to replace the old serializer
 				broadcastState.setStateMetaInfo(
-						new RegisteredBroadcastBackendStateMetaInfo<>(
+						new RegisteredBroadcastStateBackendMetaInfo<>(
 								name,
 								OperatorStateHandle.Mode.BROADCAST,
 								broadcastStateKeySerializer,
@@ -510,8 +510,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 				// Recreate all PartitionableListStates from the meta info
 				for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
 
-					final RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
-						new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
+					final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
+						new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
 
 					if (restoredMetaInfo.getPartitionStateSerializer() == null ||
 						restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
@@ -546,8 +546,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 				for (StateMetaInfoSnapshot restoredSnapshot : restoredBroadcastMetaInfoSnapshots) {
 
-					final RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
-						new RegisteredBroadcastBackendStateMetaInfo<>(restoredSnapshot);
+					final RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
+						new RegisteredBroadcastStateBackendMetaInfo<>(restoredSnapshot);
 
 					if (restoredMetaInfo.getKeySerializer() == null || restoredMetaInfo.getValueSerializer() == null ||
 						restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer ||
@@ -613,7 +613,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		/**
 		 * Meta information of the state, including state name, assignment mode, and serializer
 		 */
-		private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
+		private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;
 
 		/**
 		 * The internal list the holds the elements of the state
@@ -625,12 +625,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		 */
 		private final ArrayListSerializer<S> internalListCopySerializer;
 
-		PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+		PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
 			this(stateMetaInfo, new ArrayList<S>());
 		}
 
 		private PartitionableListState(
-				RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo,
+				RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo,
 				ArrayList<S> internalList) {
 
 			this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
@@ -643,11 +643,11 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
 		}
 
-		public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+		public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
 			this.stateMetaInfo = stateMetaInfo;
 		}
 
-		public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
+		public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() {
 			return stateMetaInfo;
 		}
 
@@ -741,7 +741,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			// no restored state for the state name; simply create new state holder
 
 			partitionableListState = new PartitionableListState<>(
-				new RegisteredOperatorBackendStateMetaInfo<>(
+				new RegisteredOperatorStateBackendMetaInfo<>(
 					name,
 					partitionStateSerializer,
 					mode));
@@ -757,8 +757,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 					mode);
 
 			StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name);
-			RegisteredOperatorBackendStateMetaInfo<S> metaInfo =
-				new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
+			RegisteredOperatorStateBackendMetaInfo<S> metaInfo =
+				new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
 
 			// check compatibility to determine if state migration is required
 			TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
@@ -772,7 +772,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			if (!stateCompatibility.isRequiresMigration()) {
 				// new serializer is compatible; use it to replace the old serializer
 				partitionableListState.setStateMetaInfo(
-					new RegisteredOperatorBackendStateMetaInfo<>(name, newPartitionStateSerializer, mode));
+					new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
 			} else {
 				// TODO state migration currently isn't possible.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
index 7ebf1ce..a262103 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
@@ -42,7 +42,7 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
 	/**
 	 * Meta information of the state, including state name, assignment mode, and serializer.
 	 */
-	private RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo;
+	private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;
 
 	/**
 	 * The internal map the holds the elements of the state.
@@ -54,11 +54,11 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
 	 */
 	private final MapSerializer<K, V> internalMapCopySerializer;
 
-	HeapBroadcastState(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) {
+	HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
 		this(stateMetaInfo, new HashMap<>());
 	}
 
-	private HeapBroadcastState(final RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo, final Map<K, V> internalMap) {
+	private HeapBroadcastState(final RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo, final Map<K, V> internalMap) {
 
 		this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
 		this.backingMap = Preconditions.checkNotNull(internalMap);
@@ -70,12 +70,12 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
 	}
 
 	@Override
-	public void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) {
+	public void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
 		this.stateMetaInfo = stateMetaInfo;
 	}
 
 	@Override
-	public RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo() {
+	public RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo() {
 		return stateMetaInfo;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
index a3ce11c..79fafc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
@@ -28,9 +28,22 @@ import javax.annotation.Nonnull;
 @FunctionalInterface
 public interface KeyExtractorFunction<T> {
 
+	KeyExtractorFunction<? extends Keyed<?>> FOR_KEYED_OBJECTS = new KeyExtractorFunction<Keyed<?>>() {
+		@Nonnull
+		@Override
+		public Object extractKeyFromElement(@Nonnull Keyed<?> element) {
+			return element.getKey();
+		}
+	};
+
 	/**
 	 * Returns the key for the given element by which the key-group can be computed.
 	 */
 	@Nonnull
 	Object extractKeyFromElement(@Nonnull T element);
+
+	@SuppressWarnings("unchecked")
+	static <T extends Keyed<?>> KeyExtractorFunction<T> forKeyedObjects() {
+		return (KeyExtractorFunction<T>) FOR_KEYED_OBJECTS;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
index 6a9dfb5..27d411c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
@@ -28,7 +29,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
- * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
+ * Class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
  * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
  * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
  * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
@@ -89,7 +90,7 @@ public class KeyGroupPartitioner<T> {
 
 	/** Cached result. */
 	@Nullable
-	protected StateSnapshot.KeyGroupPartitionedSnapshot computedResult;
+	protected StateSnapshot.StateKeyGroupWriter computedResult;
 
 	/**
 	 * Creates a new {@link KeyGroupPartitioner}.
@@ -131,7 +132,7 @@ public class KeyGroupPartitioner<T> {
 	/**
 	 * Partitions the data into key-groups and returns the result via {@link PartitioningResult}.
 	 */
-	public StateSnapshot.KeyGroupPartitionedSnapshot partitionByKeyGroup() {
+	public StateSnapshot.StateKeyGroupWriter partitionByKeyGroup() {
 		if (computedResult == null) {
 			reportAllElementKeyGroups();
 			buildHistogramByAccumulatingCounts();
@@ -198,7 +199,7 @@ public class KeyGroupPartitioner<T> {
 	 * This represents the result of key-group partitioning. The data in {@link #partitionedElements} is partitioned
 	 * w.r.t. {@link KeyGroupPartitioner#keyGroupRange}.
 	 */
-	public static class PartitioningResult<T> implements StateSnapshot.KeyGroupPartitionedSnapshot {
+	private static class PartitioningResult<T> implements StateSnapshot.StateKeyGroupWriter {
 
 		/**
 		 * Function to write one element to a {@link DataOutputView}.
@@ -249,7 +250,7 @@ public class KeyGroupPartitioner<T> {
 		}
 
 		@Override
-		public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
+		public void writeStateInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
 
 			int startOffset = getKeyGroupStartOffsetInclusive(keyGroupId);
 			int endOffset = getKeyGroupEndOffsetExclusive(keyGroupId);
@@ -264,6 +265,43 @@ public class KeyGroupPartitioner<T> {
 		}
 	}
 
+	public static <T> StateSnapshotKeyGroupReader createKeyGroupPartitionReader(
+			@Nonnull ElementReaderFunction<T> readerFunction,
+			@Nonnull KeyGroupElementsConsumer<T> elementConsumer) {
+		return new PartitioningResultKeyGroupReader<>(readerFunction, elementConsumer);
+	}
+
+	/**
+	 * General algorithm to read key-grouped state that was written from a {@link PartitioningResult}
+	 *
+	 * @param <T> type of the elements to read.
+	 */
+	private static class PartitioningResultKeyGroupReader<T> implements StateSnapshotKeyGroupReader {
+
+		@Nonnull
+		private final ElementReaderFunction<T> readerFunction;
+
+		@Nonnull
+		private final KeyGroupElementsConsumer<T> elementConsumer;
+
+		public PartitioningResultKeyGroupReader(
+			@Nonnull ElementReaderFunction<T> readerFunction,
+			@Nonnull KeyGroupElementsConsumer<T> elementConsumer) {
+
+			this.readerFunction = readerFunction;
+			this.elementConsumer = elementConsumer;
+		}
+
+		@Override
+		public void readMappingsInKeyGroup(@Nonnull DataInputView in, @Nonnegative int keyGroupId) throws IOException {
+			int numElements = in.readInt();
+			for (int i = 0; i < numElements; i++) {
+				T element = readerFunction.readElement(in);
+				elementConsumer.consume(element, keyGroupId);
+			}
+		}
+	}
+
 	/**
 	 * This functional interface defines how one element is written to a {@link DataOutputView}.
 	 *
@@ -281,4 +319,28 @@ public class KeyGroupPartitioner<T> {
 		 */
 		void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException;
 	}
+
+	/**
+	 * This functional interface defines how one element is read from a {@link DataInputView}.
+	 *
+	 * @param <T> type of the read elements.
+	 */
+	@FunctionalInterface
+	public interface ElementReaderFunction<T> {
+
+		@Nonnull
+		T readElement(@Nonnull DataInputView div) throws IOException;
+	}
+
+	/**
+	 * Functional interface to consume elements from a key group.
+	 *
+	 * @param <T> type of the consumed elements.
+	 */
+	@FunctionalInterface
+	public interface KeyGroupElementsConsumer<T> {
+
+
+		void consume(@Nonnull T element, @Nonnegative int keyGroupId) throws IOException;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
new file mode 100644
index 0000000..4320b0b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/**
+ * Interface for objects that have a key attribute.
+ *
+ * @param <K> type of the key.
+ */
+public interface Keyed<K> {
+
+	/**
+	 * Returns the key attribute.
+	 */
+	K getKey();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
new file mode 100644
index 0000000..4d6cce0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Interface for objects that can be compared by priority.
+ * @param <T> type of the compared objects.
+ */
+public interface PriorityComparable<T> {
+
+	/**
+	 * @see PriorityComparator#comparePriority(Object, Object).
+	 */
+	int comparePriorityTo(@Nonnull T other);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
index 2f6f5a7..ec36924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
@@ -30,6 +30,8 @@ package org.apache.flink.runtime.state;
 @FunctionalInterface
 public interface PriorityComparator<T> {
 
+	PriorityComparator<? extends PriorityComparable<Object>> FOR_PRIORITY_COMPARABLE_OBJECTS = PriorityComparable::comparePriorityTo;
+
 	/**
 	 * Compares two objects for priority. Returns a negative integer, zero, or a positive integer as the first
 	 * argument has lower, equal to, or higher priority than the second.
@@ -39,4 +41,9 @@ public interface PriorityComparator<T> {
 	 * priority than the second.
 	 */
 	int comparePriority(T left, T right);
+
+	@SuppressWarnings("unchecked")
+	static <T extends PriorityComparable<?>> PriorityComparator<T> forPriorityComparableObjects() {
+		return (PriorityComparator<T>) FOR_PRIORITY_COMPARABLE_OBJECTS;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
index 6f509c0..2245e72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
@@ -38,9 +38,7 @@ public interface PriorityQueueSetFactory {
 	 * @return the queue with the specified unique name.
 	 */
 	@Nonnull
-	<T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+	<T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
 		@Nonnull String stateName,
-		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
-		@Nonnull PriorityComparator<T> elementPriorityComparator,
-		@Nonnull KeyExtractorFunction<T> keyExtractor);
+		@Nonnull TypeSerializer<T> byteOrderedElementSerializer);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
deleted file mode 100644
index 98a8195..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-public class RegisteredBroadcastBackendStateMetaInfo<K, V> extends RegisteredStateMetaInfoBase {
-
-	/** The mode how elements in this state are assigned to tasks during restore. */
-	private final OperatorStateHandle.Mode assignmentMode;
-
-	/** The type serializer for the keys in the map state. */
-	private final TypeSerializer<K> keySerializer;
-
-	/** The type serializer for the values in the map state. */
-	private final TypeSerializer<V> valueSerializer;
-
-	public RegisteredBroadcastBackendStateMetaInfo(
-			final String name,
-			final OperatorStateHandle.Mode assignmentMode,
-			final TypeSerializer<K> keySerializer,
-			final TypeSerializer<V> valueSerializer) {
-
-		super(name);
-		Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.BROADCAST);
-		this.assignmentMode = assignmentMode;
-		this.keySerializer = Preconditions.checkNotNull(keySerializer);
-		this.valueSerializer = Preconditions.checkNotNull(valueSerializer);
-	}
-
-	public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> copy) {
-		this(
-			Preconditions.checkNotNull(copy).name,
-			copy.assignmentMode,
-			copy.keySerializer.duplicate(),
-			copy.valueSerializer.duplicate());
-	}
-
-	@SuppressWarnings("unchecked")
-	public RegisteredBroadcastBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
-		this(
-			snapshot.getName(),
-			OperatorStateHandle.Mode.valueOf(
-				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
-			(TypeSerializer<K>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
-			(TypeSerializer<V>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
-	}
-
-	/**
-	 * Creates a deep copy of the itself.
-	 */
-	public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() {
-		return new RegisteredBroadcastBackendStateMetaInfo<>(this);
-	}
-
-	@Nonnull
-	@Override
-	public StateMetaInfoSnapshot snapshot() {
-		Map<String, String> optionsMap = Collections.singletonMap(
-			StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
-			assignmentMode.toString());
-		Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
-		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
-		String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
-		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
-		serializerMap.put(keySerializerKey, keySerializer.duplicate());
-		serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration());
-		serializerMap.put(valueSerializerKey, valueSerializer.duplicate());
-		serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration());
-
-		return new StateMetaInfoSnapshot(
-			name,
-			StateMetaInfoSnapshot.BackendStateType.BROADCAST,
-			optionsMap,
-			serializerConfigSnapshotsMap,
-			serializerMap);
-	}
-
-	public TypeSerializer<K> getKeySerializer() {
-		return keySerializer;
-	}
-
-	public TypeSerializer<V> getValueSerializer() {
-		return valueSerializer;
-	}
-
-	public OperatorStateHandle.Mode getAssignmentMode() {
-		return assignmentMode;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == this) {
-			return true;
-		}
-
-		if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
-			return false;
-		}
-
-		final RegisteredBroadcastBackendStateMetaInfo other =
-				(RegisteredBroadcastBackendStateMetaInfo) obj;
-
-		return Objects.equals(name, other.getName())
-				&& Objects.equals(assignmentMode, other.getAssignmentMode())
-				&& Objects.equals(keySerializer, other.getKeySerializer())
-				&& Objects.equals(valueSerializer, other.getValueSerializer());
-	}
-
-	@Override
-	public int hashCode() {
-		int result = name.hashCode();
-		result = 31 * result + assignmentMode.hashCode();
-		result = 31 * result + keySerializer.hashCode();
-		result = 31 * result + valueSerializer.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "RegisteredBroadcastBackendStateMetaInfo{" +
-				"name='" + name + '\'' +
-				", keySerializer=" + keySerializer +
-				", valueSerializer=" + valueSerializer +
-				", assignmentMode=" + assignmentMode +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
new file mode 100644
index 0000000..02ab8ef
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredStateMetaInfoBase {
+
+	/** The mode how elements in this state are assigned to tasks during restore. */
+	@Nonnull
+	private final OperatorStateHandle.Mode assignmentMode;
+
+	/** The type serializer for the keys in the map state. */
+	@Nonnull
+	private final TypeSerializer<K> keySerializer;
+
+	/** The type serializer for the values in the map state. */
+	@Nonnull
+	private final TypeSerializer<V> valueSerializer;
+
+	public RegisteredBroadcastStateBackendMetaInfo(
+			@Nonnull final String name,
+			@Nonnull final OperatorStateHandle.Mode assignmentMode,
+			@Nonnull final TypeSerializer<K> keySerializer,
+			@Nonnull final TypeSerializer<V> valueSerializer) {
+
+		super(name);
+		Preconditions.checkArgument(assignmentMode == OperatorStateHandle.Mode.BROADCAST);
+		this.assignmentMode = assignmentMode;
+		this.keySerializer = keySerializer;
+		this.valueSerializer = valueSerializer;
+	}
+
+	public RegisteredBroadcastStateBackendMetaInfo(@Nonnull RegisteredBroadcastStateBackendMetaInfo<K, V> copy) {
+		this(
+			Preconditions.checkNotNull(copy).name,
+			copy.assignmentMode,
+			copy.keySerializer.duplicate(),
+			copy.valueSerializer.duplicate());
+	}
+
+	@SuppressWarnings("unchecked")
+	public RegisteredBroadcastStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+		this(
+			snapshot.getName(),
+			OperatorStateHandle.Mode.valueOf(
+				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
+			(TypeSerializer<K>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
+			(TypeSerializer<V>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
+	}
+
+	/**
+	 * Creates a deep copy of the itself.
+	 */
+	@Nonnull
+	public RegisteredBroadcastStateBackendMetaInfo<K, V> deepCopy() {
+		return new RegisteredBroadcastStateBackendMetaInfo<>(this);
+	}
+
+	@Nonnull
+	@Override
+	public StateMetaInfoSnapshot snapshot() {
+		return computeSnapshot();
+	}
+
+	@Nonnull
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Nonnull
+	public TypeSerializer<V> getValueSerializer() {
+		return valueSerializer;
+	}
+
+	@Nonnull
+	public OperatorStateHandle.Mode getAssignmentMode() {
+		return assignmentMode;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+
+		if (!(obj instanceof RegisteredBroadcastStateBackendMetaInfo)) {
+			return false;
+		}
+
+		final RegisteredBroadcastStateBackendMetaInfo other =
+				(RegisteredBroadcastStateBackendMetaInfo) obj;
+
+		return Objects.equals(name, other.getName())
+				&& Objects.equals(assignmentMode, other.getAssignmentMode())
+				&& Objects.equals(keySerializer, other.getKeySerializer())
+				&& Objects.equals(valueSerializer, other.getValueSerializer());
+	}
+
+	@Override
+	public int hashCode() {
+		int result = name.hashCode();
+		result = 31 * result + assignmentMode.hashCode();
+		result = 31 * result + keySerializer.hashCode();
+		result = 31 * result + valueSerializer.hashCode();
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "RegisteredBroadcastBackendStateMetaInfo{" +
+				"name='" + name + '\'' +
+				", keySerializer=" + keySerializer +
+				", valueSerializer=" + valueSerializer +
+				", assignmentMode=" + assignmentMode +
+				'}';
+	}
+
+	@Nonnull
+	private StateMetaInfoSnapshot computeSnapshot() {
+		Map<String, String> optionsMap = Collections.singletonMap(
+			StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
+			assignmentMode.toString());
+		Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
+		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
+		String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
+		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+		serializerMap.put(keySerializerKey, keySerializer.duplicate());
+		serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration());
+		serializerMap.put(valueSerializerKey, valueSerializer.duplicate());
+		serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration());
+
+		return new StateMetaInfoSnapshot(
+			name,
+			StateMetaInfoSnapshot.BackendStateType.BROADCAST,
+			optionsMap,
+			serializerConfigSnapshotsMap,
+			serializerMap);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
new file mode 100644
index 0000000..d49a05c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
+ * state name.
+ *
+ * @param <N> Type of namespace
+ * @param <S> Type of state value
+ */
+public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStateMetaInfoBase {
+
+	@Nonnull
+	private final StateDescriptor.Type stateType;
+	@Nonnull
+	private final TypeSerializer<N> namespaceSerializer;
+	@Nonnull
+	private final TypeSerializer<S> stateSerializer;
+
+	public RegisteredKeyValueStateBackendMetaInfo(
+			@Nonnull StateDescriptor.Type stateType,
+			@Nonnull String name,
+			@Nonnull TypeSerializer<N> namespaceSerializer,
+			@Nonnull TypeSerializer<S> stateSerializer) {
+
+		super(name);
+		this.stateType = stateType;
+		this.namespaceSerializer = namespaceSerializer;
+		this.stateSerializer = stateSerializer;
+	}
+
+	@SuppressWarnings("unchecked")
+	public RegisteredKeyValueStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+		this(
+			StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
+			snapshot.getName(),
+			(TypeSerializer<N>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER)),
+			(TypeSerializer<S>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
+	}
+
+	@Nonnull
+	public StateDescriptor.Type getStateType() {
+		return stateType;
+	}
+
+	@Nonnull
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Nonnull
+	public TypeSerializer<S> getStateSerializer() {
+		return stateSerializer;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		RegisteredKeyValueStateBackendMetaInfo<?, ?> that = (RegisteredKeyValueStateBackendMetaInfo<?, ?>) o;
+
+		if (!stateType.equals(that.stateType)) {
+			return false;
+		}
+
+		if (!getName().equals(that.getName())) {
+			return false;
+		}
+
+		return getStateSerializer().equals(that.getStateSerializer())
+				&& getNamespaceSerializer().equals(that.getNamespaceSerializer());
+	}
+
+	@Override
+	public String toString() {
+		return "RegisteredKeyedBackendStateMetaInfo{" +
+				"stateType=" + stateType +
+				", name='" + name + '\'' +
+				", namespaceSerializer=" + namespaceSerializer +
+				", stateSerializer=" + stateSerializer +
+				'}';
+	}
+
+	@Override
+	public int hashCode() {
+		int result = getName().hashCode();
+		result = 31 * result + getStateType().hashCode();
+		result = 31 * result + getNamespaceSerializer().hashCode();
+		result = 31 * result + getStateSerializer().hashCode();
+		return result;
+	}
+
+	/**
+	 * Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
+	 * This checks that the descriptor specifies identical names and state types, as well as
+	 * serializers that are compatible for the restored k/v state bytes.
+	 */
+	@Nonnull
+	public static <N, S> RegisteredKeyValueStateBackendMetaInfo<N, S> resolveKvStateCompatibility(
+		StateMetaInfoSnapshot restoredStateMetaInfoSnapshot,
+		TypeSerializer<N> newNamespaceSerializer,
+		StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {
+
+		Preconditions.checkState(
+			Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
+			"Incompatible state names. " +
+				"Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
+				"registered with [" + newStateDescriptor.getName() + "].");
+
+		final StateDescriptor.Type restoredType =
+			StateDescriptor.Type.valueOf(
+				restoredStateMetaInfoSnapshot.getOption(
+					StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+		if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
+			&& !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) {
+
+			Preconditions.checkState(
+				newStateDescriptor.getType() == restoredType,
+				"Incompatible state types. " +
+					"Was [" + restoredType + "], " +
+					"registered with [" + newStateDescriptor.getType() + "].");
+		}
+
+		// check compatibility results to determine if state migration is required
+		CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+			restoredStateMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
+			null,
+			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+				StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
+			newNamespaceSerializer);
+
+		TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
+		CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+			restoredStateMetaInfoSnapshot.getTypeSerializer(
+				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
+			UnloadableDummyTypeSerializer.class,
+			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
+			newStateSerializer);
+
+		if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
+			// TODO state migration currently isn't possible.
+			throw new StateMigrationException("State migration isn't supported, yet.");
+		} else {
+			return new RegisteredKeyValueStateBackendMetaInfo<>(
+				newStateDescriptor.getType(),
+				newStateDescriptor.getName(),
+				newNamespaceSerializer,
+				newStateSerializer);
+		}
+	}
+
+	@Nonnull
+	@Override
+	public StateMetaInfoSnapshot snapshot() {
+		return computeSnapshot();
+	}
+
+	@Nonnull
+	private StateMetaInfoSnapshot computeSnapshot() {
+		Map<String, String> optionsMap = Collections.singletonMap(
+			StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
+			stateType.toString());
+		Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
+		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
+		String namespaceSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
+		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+		serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate());
+		serializerConfigSnapshotsMap.put(namespaceSerializerKey, namespaceSerializer.snapshotConfiguration());
+		serializerMap.put(valueSerializerKey, stateSerializer.duplicate());
+		serializerConfigSnapshotsMap.put(valueSerializerKey, stateSerializer.snapshotConfiguration());
+
+		return new StateMetaInfoSnapshot(
+			name,
+			StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+			optionsMap,
+			serializerConfigSnapshotsMap,
+			serializerMap);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
deleted file mode 100644
index e9b230a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StateMigrationException;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
- * state name.
- *
- * @param <N> Type of namespace
- * @param <S> Type of state value
- */
-public class RegisteredKeyedBackendStateMetaInfo<N, S> extends RegisteredStateMetaInfoBase {
-
-	private final StateDescriptor.Type stateType;
-	private final TypeSerializer<N> namespaceSerializer;
-	private final TypeSerializer<S> stateSerializer;
-
-	public RegisteredKeyedBackendStateMetaInfo(
-			StateDescriptor.Type stateType,
-			String name,
-			TypeSerializer<N> namespaceSerializer,
-			TypeSerializer<S> stateSerializer) {
-
-		super(name);
-		this.stateType = checkNotNull(stateType);
-		this.namespaceSerializer = checkNotNull(namespaceSerializer);
-		this.stateSerializer = checkNotNull(stateSerializer);
-	}
-
-	@SuppressWarnings("unchecked")
-	public RegisteredKeyedBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
-		this(
-			StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
-			snapshot.getName(),
-			(TypeSerializer<N>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-			(TypeSerializer<S>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
-	}
-
-	public StateDescriptor.Type getStateType() {
-		return stateType;
-	}
-
-	public TypeSerializer<N> getNamespaceSerializer() {
-		return namespaceSerializer;
-	}
-
-	public TypeSerializer<S> getStateSerializer() {
-		return stateSerializer;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		RegisteredKeyedBackendStateMetaInfo<?, ?> that = (RegisteredKeyedBackendStateMetaInfo<?, ?>) o;
-
-		if (!stateType.equals(that.stateType)) {
-			return false;
-		}
-
-		if (!getName().equals(that.getName())) {
-			return false;
-		}
-
-		return getStateSerializer().equals(that.getStateSerializer())
-				&& getNamespaceSerializer().equals(that.getNamespaceSerializer());
-	}
-
-	@Override
-	public String toString() {
-		return "RegisteredKeyedBackendStateMetaInfo{" +
-				"stateType=" + stateType +
-				", name='" + name + '\'' +
-				", namespaceSerializer=" + namespaceSerializer +
-				", stateSerializer=" + stateSerializer +
-				'}';
-	}
-
-	@Override
-	public int hashCode() {
-		int result = getName().hashCode();
-		result = 31 * result + getStateType().hashCode();
-		result = 31 * result + getNamespaceSerializer().hashCode();
-		result = 31 * result + getStateSerializer().hashCode();
-		return result;
-	}
-
-	/**
-	 * Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
-	 * This checks that the descriptor specifies identical names and state types, as well as
-	 * serializers that are compatible for the restored k/v state bytes.
-	 */
-	public static <N, S> RegisteredKeyedBackendStateMetaInfo<N, S> resolveKvStateCompatibility(
-		StateMetaInfoSnapshot restoredStateMetaInfoSnapshot,
-		TypeSerializer<N> newNamespaceSerializer,
-		StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {
-
-		Preconditions.checkState(
-			Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
-			"Incompatible state names. " +
-				"Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
-				"registered with [" + newStateDescriptor.getName() + "].");
-
-		final StateDescriptor.Type restoredType =
-			StateDescriptor.Type.valueOf(
-				restoredStateMetaInfoSnapshot.getOption(
-					StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
-
-		if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
-			&& !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) {
-
-			Preconditions.checkState(
-				newStateDescriptor.getType() == restoredType,
-				"Incompatible state types. " +
-					"Was [" + restoredType + "], " +
-					"registered with [" + newStateDescriptor.getType() + "].");
-		}
-
-		// check compatibility results to determine if state migration is required
-		CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-			restoredStateMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-			null,
-			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-				StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-			newNamespaceSerializer);
-
-		TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
-		CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-			restoredStateMetaInfoSnapshot.getTypeSerializer(
-				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-			UnloadableDummyTypeSerializer.class,
-			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-			newStateSerializer);
-
-		if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
-			// TODO state migration currently isn't possible.
-			throw new StateMigrationException("State migration isn't supported, yet.");
-		} else {
-			return new RegisteredKeyedBackendStateMetaInfo<>(
-				newStateDescriptor.getType(),
-				newStateDescriptor.getName(),
-				newNamespaceSerializer,
-				newStateSerializer);
-		}
-	}
-
-	@Nonnull
-	@Override
-	public StateMetaInfoSnapshot snapshot() {
-		Map<String, String> optionsMap = Collections.singletonMap(
-			StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
-			stateType.toString());
-		Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
-		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
-		String namespaceSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
-		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
-		serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate());
-		serializerConfigSnapshotsMap.put(namespaceSerializerKey, namespaceSerializer.snapshotConfiguration());
-		serializerMap.put(valueSerializerKey, stateSerializer.duplicate());
-		serializerConfigSnapshotsMap.put(valueSerializerKey, stateSerializer.snapshotConfiguration());
-
-		return new StateMetaInfoSnapshot(
-			name,
-			StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
-			optionsMap,
-			serializerConfigSnapshotsMap,
-			serializerMap);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
deleted file mode 100644
index f314add..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Compound meta information for a registered state in an operator state backend.
- * This contains the state name, assignment mode, and state partition serializer.
- *
- * @param <S> Type of the state.
- */
-public class RegisteredOperatorBackendStateMetaInfo<S> extends RegisteredStateMetaInfoBase {
-
-	/**
-	 * The mode how elements in this state are assigned to tasks during restore
-	 */
-	private final OperatorStateHandle.Mode assignmentMode;
-
-	/**
-	 * The type serializer for the elements in the state list
-	 */
-	private final TypeSerializer<S> partitionStateSerializer;
-
-	public RegisteredOperatorBackendStateMetaInfo(
-			String name,
-			TypeSerializer<S> partitionStateSerializer,
-			OperatorStateHandle.Mode assignmentMode) {
-		super(Preconditions.checkNotNull(name));
-		this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
-		this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
-	}
-
-	private RegisteredOperatorBackendStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> copy) {
-		this(
-			Preconditions.checkNotNull(copy).name,
-			copy.partitionStateSerializer.duplicate(),
-			copy.assignmentMode);
-	}
-
-	@SuppressWarnings("unchecked")
-	public RegisteredOperatorBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
-		this(
-			snapshot.getName(),
-			(TypeSerializer<S>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-			OperatorStateHandle.Mode.valueOf(
-				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
-		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
-	}
-
-	/**
-	 * Creates a deep copy of the itself.
-	 */
-	public RegisteredOperatorBackendStateMetaInfo<S> deepCopy() {
-		return new RegisteredOperatorBackendStateMetaInfo<>(this);
-	}
-
-	@Nonnull
-	@Override
-	public StateMetaInfoSnapshot snapshot() {
-		Map<String, String> optionsMap = Collections.singletonMap(
-			StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
-			assignmentMode.toString());
-		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
-		Map<String, TypeSerializer<?>> serializerMap =
-			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
-		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
-			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
-
-		return new StateMetaInfoSnapshot(
-			name,
-			StateMetaInfoSnapshot.BackendStateType.OPERATOR,
-			optionsMap,
-			serializerConfigSnapshotsMap,
-			serializerMap);
-	}
-
-	public OperatorStateHandle.Mode getAssignmentMode() {
-		return assignmentMode;
-	}
-
-	public TypeSerializer<S> getPartitionStateSerializer() {
-		return partitionStateSerializer;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == this) {
-			return true;
-		}
-
-		if (obj == null) {
-			return false;
-		}
-
-		return (obj instanceof RegisteredOperatorBackendStateMetaInfo)
-			&& name.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getName())
-			&& assignmentMode.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getAssignmentMode())
-			&& partitionStateSerializer.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getPartitionStateSerializer());
-	}
-
-	@Override
-	public int hashCode() {
-		int result = getName().hashCode();
-		result = 31 * result + getAssignmentMode().hashCode();
-		result = 31 * result + getPartitionStateSerializer().hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "RegisteredOperatorBackendStateMetaInfo{" +
-			"name='" + name + "\'" +
-			", assignmentMode=" + assignmentMode +
-			", partitionStateSerializer=" + partitionStateSerializer +
-			'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
new file mode 100644
index 0000000..b65671e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Compound meta information for a registered state in an operator state backend.
+ * This contains the state name, assignment mode, and state partition serializer.
+ *
+ * @param <S> Type of the state.
+ */
+public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMetaInfoBase {
+
+	/**
+	 * The mode how elements in this state are assigned to tasks during restore
+	 */
+	@Nonnull
+	private final OperatorStateHandle.Mode assignmentMode;
+
+	/**
+	 * The type serializer for the elements in the state list
+	 */
+	@Nonnull
+	private final TypeSerializer<S> partitionStateSerializer;
+
+	public RegisteredOperatorStateBackendMetaInfo(
+			@Nonnull String name,
+			@Nonnull TypeSerializer<S> partitionStateSerializer,
+			@Nonnull OperatorStateHandle.Mode assignmentMode) {
+		super(name);
+		this.partitionStateSerializer = partitionStateSerializer;
+		this.assignmentMode = assignmentMode;
+	}
+
+	private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo<S> copy) {
+		this(
+			Preconditions.checkNotNull(copy).name,
+			copy.partitionStateSerializer.duplicate(),
+			copy.assignmentMode);
+	}
+
+	@SuppressWarnings("unchecked")
+	public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+		this(
+			snapshot.getName(),
+			(TypeSerializer<S>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
+			OperatorStateHandle.Mode.valueOf(
+				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
+		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
+	}
+
+	/**
+	 * Creates a deep copy of the itself.
+	 */
+	@Nonnull
+	public RegisteredOperatorStateBackendMetaInfo<S> deepCopy() {
+		return new RegisteredOperatorStateBackendMetaInfo<>(this);
+	}
+
+	@Nonnull
+	@Override
+	public StateMetaInfoSnapshot snapshot() {
+		return computeSnapshot();
+	}
+
+	@Nonnull
+	public OperatorStateHandle.Mode getAssignmentMode() {
+		return assignmentMode;
+	}
+
+	@Nonnull
+	public TypeSerializer<S> getPartitionStateSerializer() {
+		return partitionStateSerializer;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+
+		if (obj == null) {
+			return false;
+		}
+
+		return (obj instanceof RegisteredOperatorStateBackendMetaInfo)
+			&& name.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getName())
+			&& assignmentMode.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getAssignmentMode())
+			&& partitionStateSerializer.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getPartitionStateSerializer());
+	}
+
+	@Override
+	public int hashCode() {
+		int result = getName().hashCode();
+		result = 31 * result + getAssignmentMode().hashCode();
+		result = 31 * result + getPartitionStateSerializer().hashCode();
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "RegisteredOperatorBackendStateMetaInfo{" +
+			"name='" + name + "\'" +
+			", assignmentMode=" + assignmentMode +
+			", partitionStateSerializer=" + partitionStateSerializer +
+			'}';
+	}
+
+	@Nonnull
+	private StateMetaInfoSnapshot computeSnapshot() {
+		Map<String, String> optionsMap = Collections.singletonMap(
+			StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
+			assignmentMode.toString());
+		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+		Map<String, TypeSerializer<?>> serializerMap =
+			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
+		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
+			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
+
+		return new StateMetaInfoSnapshot(
+			name,
+			StateMetaInfoSnapshot.BackendStateType.OPERATOR,
+			optionsMap,
+			serializerConfigSnapshotsMap,
+			serializerMap);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
new file mode 100644
index 0000000..9ef23ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Meta information about a priority queue state in a backend.
+ */
+public class RegisteredPriorityQueueStateBackendMetaInfo<T> extends RegisteredStateMetaInfoBase {
+
+	@Nonnull
+	private final TypeSerializer<T> elementSerializer;
+
+	public RegisteredPriorityQueueStateBackendMetaInfo(
+		@Nonnull String name,
+		@Nonnull TypeSerializer<T> elementSerializer) {
+
+		super(name);
+		this.elementSerializer = elementSerializer;
+	}
+
+	@SuppressWarnings("unchecked")
+	public RegisteredPriorityQueueStateBackendMetaInfo(StateMetaInfoSnapshot snapshot) {
+		this(snapshot.getName(),
+			(TypeSerializer<T>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE == snapshot.getBackendStateType());
+	}
+
+	@Nonnull
+	@Override
+	public StateMetaInfoSnapshot snapshot() {
+		return computeSnapshot();
+	}
+
+	@Nonnull
+	public TypeSerializer<T> getElementSerializer() {
+		return elementSerializer;
+	}
+
+	private StateMetaInfoSnapshot computeSnapshot() {
+		Map<String, TypeSerializer<?>> serializerMap =
+			Collections.singletonMap(
+				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+				elementSerializer.duplicate());
+		Map<String, TypeSerializerConfigSnapshot> serializerSnapshotMap =
+			Collections.singletonMap(
+				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+				elementSerializer.snapshotConfiguration());
+
+		return new StateMetaInfoSnapshot(
+			name,
+			StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE,
+			Collections.emptyMap(),
+			serializerSnapshotMap,
+			serializerMap);
+	}
+
+	public RegisteredPriorityQueueStateBackendMetaInfo deepCopy() {
+		return new RegisteredPriorityQueueStateBackendMetaInfo<>(name, elementSerializer.duplicate());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
index 4132d14..b7dff59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
@@ -42,4 +42,21 @@ public abstract class RegisteredStateMetaInfoBase {
 
 	@Nonnull
 	public abstract StateMetaInfoSnapshot snapshot();
+
+	public static RegisteredStateMetaInfoBase fromMetaInfoSnapshot(@Nonnull StateMetaInfoSnapshot snapshot) {
+
+		final StateMetaInfoSnapshot.BackendStateType backendStateType = snapshot.getBackendStateType();
+		switch (backendStateType) {
+			case KEY_VALUE:
+				return new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+			case OPERATOR:
+				return new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
+			case BROADCAST:
+				return new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
+			case PRIORITY_QUEUE:
+				return new RegisteredPriorityQueueStateBackendMetaInfo<>(snapshot);
+			default:
+				throw new IllegalArgumentException("Unknown backend state type: " + backendStateType);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
index 1fcac5c..54180f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
@@ -30,38 +32,45 @@ import java.io.IOException;
  * All snapshots should be released after usage. This interface outlines the asynchronous snapshot life-cycle, which
  * typically looks as follows. In the synchronous part of a checkpoint, an instance of {@link StateSnapshot} is produced
  * for a state and captures the state at this point in time. Then, in the asynchronous part of the checkpoint, the user
- * calls {@link #partitionByKeyGroup()} to ensure that the snapshot is partitioned into key-groups. For state that is
- * already partitioned, this can be a NOP. The returned {@link KeyGroupPartitionedSnapshot} can be used by the caller
+ * calls {@link #getKeyGroupWriter()} to ensure that the snapshot is partitioned into key-groups. For state that is
+ * already partitioned, this can be a NOP. The returned {@link StateKeyGroupWriter} can be used by the caller
  * to write the state by key-group. As a last step, when the state is completely written, the user calls
  * {@link #release()}.
  */
+@Internal
 public interface StateSnapshot {
 
 	/**
-	 * This method partitions the snapshot by key-group and then returns a {@link KeyGroupPartitionedSnapshot}.
+	 * This method returns {@link StateKeyGroupWriter} and should be called in the asynchronous part of the snapshot.
 	 */
 	@Nonnull
-	KeyGroupPartitionedSnapshot partitionByKeyGroup();
+	StateKeyGroupWriter getKeyGroupWriter();
+
+	/**
+	 * Returns a snapshot of the state's meta data.
+	 */
+	@Nonnull
+	StateMetaInfoSnapshot getMetaInfoSnapshot();
 
 	/**
 	 * Release the snapshot. All snapshots should be released when they are no longer used because some implementation
-	 * can only release resources after a release. Produced {@link KeyGroupPartitionedSnapshot} should no longer be used
+	 * can only release resources after a release. Produced {@link StateKeyGroupWriter} should no longer be used
 	 * after calling this method.
 	 */
 	void release();
 
 	/**
-	 * Interface for writing a snapshot after it is partitioned into key-groups.
+	 * Interface for writing a snapshot that is partitioned into key-groups.
 	 */
-	interface KeyGroupPartitionedSnapshot {
+	interface StateKeyGroupWriter {
 		/**
-		 * Writes the data for the specified key-group to the output. You must call {@link #partitionByKeyGroup()} once
+		 * Writes the data for the specified key-group to the output. You must call {@link #getKeyGroupWriter()} once
 		 * before first calling this method.
 		 *
 		 * @param dov        the output.
 		 * @param keyGroupId the key-group to write.
 		 * @throws IOException on write-related problems.
 		 */
-		void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, @Nonnegative int keyGroupId) throws IOException;
+		void writeStateInKeyGroup(@Nonnull DataOutputView dov, @Nonnegative int keyGroupId) throws IOException;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
new file mode 100644
index 0000000..b6b91ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.StateTable;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * Interface for state de-serialization into {@link StateTable}s by key-group.
+ */
+@Internal
+public interface StateSnapshotKeyGroupReader {
+
+	/**
+	 * Read the data for the specified key-group from the input.
+	 *
+	 * @param div        the input
+	 * @param keyGroupId the key-group to write
+	 * @throws IOException on write related problems
+	 */
+	void readMappingsInKeyGroup(@Nonnull DataInputView div, @Nonnegative int keyGroupId) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
new file mode 100644
index 0000000..6fe50ca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Interface to deal with state snapshot and restore of state.
+ * TODO find better name?
+ */
+@Internal
+public interface StateSnapshotRestore {
+
+	/**
+	 * Returns a snapshot of the state.
+	 */
+	@Nonnull
+	StateSnapshot stateSnapshot();
+
+	/**
+	 * This method returns a {@link StateSnapshotKeyGroupReader} that can be used to restore the state on a
+	 * per-key-group basis. This method tries to return a reader for the given version hint.
+	 *
+	 * @param readVersionHint the required version of the state to read.
+	 * @return a reader that reads state by key-groups, for the given read version.
+	 */
+	@Nonnull
+	StateSnapshotKeyGroupReader keyGroupReader(int readVersionHint);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
index 4384eb7..94aed45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
@@ -85,11 +85,6 @@ public class TieBreakingPriorityComparator<T> implements Comparator<T>, Priority
 			return ((Comparable<T>) o1).compareTo(o2);
 		}
 
-		// we catch this case before moving to more expensive tie breaks.
-		if (o1.equals(o2)) {
-			return 0;
-		}
-
 		// if objects are not equal, their serialized form should somehow differ as well. this can be costly, and...
 		// TODO we should have an alternative approach in the future, e.g. a cache that does not rely on compare to check equality.
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
index 6dc8cf3..b1ad0df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
@@ -30,6 +30,8 @@ import java.util.Collection;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
+import static org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION;
+
 /**
  * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
  * two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth
@@ -80,6 +82,30 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
 
 	@Override
 	public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+		if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
+			bulkPollRelaxedOrder(canConsume, consumer);
+		} else {
+			bulkPollStrictOrder(canConsume, consumer);
+		}
+	}
+
+	private void bulkPollRelaxedOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+		if (orderedCache.isEmpty()) {
+			bulkPollStore(canConsume, consumer);
+		} else {
+			while (!orderedCache.isEmpty() && canConsume.test(orderedCache.peekFirst())) {
+				final E next = orderedCache.removeFirst();
+				orderedStore.remove(next);
+				consumer.accept(next);
+			}
+
+			if (orderedCache.isEmpty()) {
+				bulkPollStore(canConsume, consumer);
+			}
+		}
+	}
+
+	private void bulkPollStrictOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
 		E element;
 		while ((element = peek()) != null && canConsume.test(element)) {
 			poll();
@@ -87,6 +113,26 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
 		}
 	}
 
+	private void bulkPollStore(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+		try (CloseableIterator<E> iterator = orderedStore.orderedIterator()) {
+			while (iterator.hasNext()) {
+				final E next = iterator.next();
+				if (canConsume.test(next)) {
+					orderedStore.remove(next);
+					consumer.accept(next);
+				} else {
+					orderedCache.add(next);
+					while (iterator.hasNext() && !orderedCache.isFull()) {
+						orderedCache.add(iterator.next());
+					}
+					break;
+				}
+			}
+		} catch (Exception e) {
+			throw new FlinkRuntimeException("Exception while bulk polling store.", e);
+		}
+	}
+
 	@Nullable
 	@Override
 	public E poll() {


[2/8] flink git commit: [hotfix] Rename PriorityQueueStateType.ROCKS into PriorityQueueStateType.ROCKSDB

Posted by tr...@apache.org.
[hotfix] Rename PriorityQueueStateType.ROCKS into PriorityQueueStateType.ROCKSDB


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8db5ca6b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8db5ca6b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8db5ca6b

Branch: refs/heads/master
Commit: 8db5ca6b0da13f7bd8dab3edee127c50cf26109b
Parents: a4b4cb7
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 09:31:06 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200

----------------------------------------------------------------------
 .../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java  | 2 +-
 .../org/apache/flink/contrib/streaming/state/RocksDBOptions.java | 4 ++--
 .../flink/contrib/streaming/state/RocksDBStateBackend.java       | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8db5ca6b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0697463..72aa4fb 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -324,7 +324,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			case HEAP:
 				this.priorityQueueFactory = new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
 				break;
-			case ROCKS:
+			case ROCKSDB:
 				this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory();
 				break;
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/8db5ca6b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 37eb6cf..18f9ec9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
 import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.HEAP;
-import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.ROCKS;
+import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.ROCKSDB;
 
 /**
  * Configuration options for the RocksDB backend.
@@ -43,5 +43,5 @@ public class RocksDBOptions {
 		.key("state.backend.rocksdb.timer-service.impl")
 		.defaultValue(HEAP.name())
 		.withDescription(String.format("This determines the timer service implementation. Options are either %s " +
-			"(heap-based, default) or %s for an implementation based on RocksDB.", HEAP.name(), ROCKS.name()));
+			"(heap-based, default) or %s for an implementation based on RocksDB.", HEAP.name(), ROCKSDB.name()));
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8db5ca6b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 58e8de6..c6b50cd 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -83,7 +83,7 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
 	 */
 	public enum PriorityQueueStateType {
 		HEAP,
-		ROCKS
+		ROCKSDB
 	}
 
 	private static final long serialVersionUID = 1L;


[6/8] flink git commit: [hotfix] Add EXCLUSIONS set to ConfigOptionsDocGenerator

Posted by tr...@apache.org.
[hotfix] Add EXCLUSIONS set to ConfigOptionsDocGenerator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a88d6efc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a88d6efc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a88d6efc

Branch: refs/heads/master
Commit: a88d6efc5d2280367e27c3df46451bd161a4d100
Parents: dbddf00
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 16 13:11:14 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200

----------------------------------------------------------------------
 .../ConfigOptionsDocGenerator.java              | 23 +++++++++++++++-----
 1 file changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a88d6efc/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 743f49f..953122f 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -26,8 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.util.function.ThrowingConsumer;
 
-import static org.apache.flink.docs.util.Utils.escapeCharacters;
-
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -36,15 +34,20 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static org.apache.flink.docs.util.Utils.escapeCharacters;
+
 /**
  * Class used for generating code based documentation of configuration parameters.
  */
@@ -56,9 +59,13 @@ public class ConfigOptionsDocGenerator {
 		new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
 		new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
-		new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus"),
+		new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus")
 	};
 
+	static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(
+		"org.apache.flink.configuration.ConfigOptions",
+		"org.apache.flink.contrib.streaming.state.PredefinedOptions"));
+
 	static final String DEFAULT_PATH_PREFIX = "src/main/java";
 
 	@VisibleForTesting
@@ -149,9 +156,13 @@ public class ConfigOptionsDocGenerator {
 			for (Path entry : stream) {
 				String fileName = entry.getFileName().toString();
 				Matcher matcher = CLASS_NAME_PATTERN.matcher(fileName);
-				if (!fileName.equals("ConfigOptions.java") && matcher.matches()) {
-					Class<?> optionsClass = Class.forName(packageName + '.' + matcher.group(CLASS_NAME_GROUP));
-					classConsumer.accept(optionsClass);
+				if (matcher.matches()) {
+					final String className = packageName + '.' + matcher.group(CLASS_NAME_GROUP);
+
+					if (!EXCLUSIONS.contains(className)) {
+						Class<?> optionsClass = Class.forName(className);
+						classConsumer.accept(optionsClass);
+					}
 				}
 			}
 		}


[4/8] flink git commit: [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index 3a348a9..72c70bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
@@ -204,7 +204,7 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 	 * @param keyContext the key context.
 	 * @param metaInfo   the meta information, including the type serializer for state copy-on-write.
 	 */
-	CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
+	CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
 		this(keyContext, metaInfo, 1024);
 	}
 
@@ -217,7 +217,7 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 	 * @throws IllegalArgumentException when the capacity is less than zero.
 	 */
 	@SuppressWarnings("unchecked")
-	private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo, int capacity) {
+	private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo, int capacity) {
 		super(keyContext, metaInfo);
 
 		// initialized tables to EMPTY_TABLE.
@@ -547,12 +547,12 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 	}
 
 	@Override
-	public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() {
+	public RegisteredKeyValueStateBackendMetaInfo<N, S> getMetaInfo() {
 		return metaInfo;
 	}
 
 	@Override
-	public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
+	public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
 		this.metaInfo = metaInfo;
 	}
 
@@ -871,8 +871,9 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
 	 *
 	 * @return a snapshot from this {@link CopyOnWriteStateTable}, for checkpointing.
 	 */
+	@Nonnull
 	@Override
-	public CopyOnWriteStateTableSnapshot<K, N, S> createSnapshot() {
+	public CopyOnWriteStateTableSnapshot<K, N, S> stateSnapshot() {
 		return new CopyOnWriteStateTableSnapshot<>(this);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
index 4c0ab6f..f3f21dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.KeyGroupPartitioner;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
@@ -91,7 +91,7 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
 	 * to an output as part of checkpointing.
 	 */
 	@Nullable
-	private StateSnapshot.KeyGroupPartitionedSnapshot partitionedStateTableSnapshot;
+	private StateKeyGroupWriter partitionedStateTableSnapshot;
 
 	/**
 	 * Creates a new {@link CopyOnWriteStateTableSnapshot}.
@@ -135,7 +135,7 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
 	@Nonnull
 	@SuppressWarnings("unchecked")
 	@Override
-	public KeyGroupPartitionedSnapshot partitionByKeyGroup() {
+	public StateKeyGroupWriter getKeyGroupWriter() {
 
 		if (partitionedStateTableSnapshot == null) {
 
@@ -160,6 +160,12 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
 		return partitionedStateTableSnapshot;
 	}
 
+	@Nonnull
+	@Override
+	public StateMetaInfoSnapshot getMetaInfoSnapshot() {
+		return owningStateTable.metaInfo.snapshot();
+	}
+
 	@Override
 	public void release() {
 		owningStateTable.releaseSnapshot(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 495dfe0..2c6101e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -49,22 +49,25 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.PriorityComparator;
-import org.apache.flink.runtime.state.PriorityQueueSetFactory;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
 import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
-import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
@@ -108,19 +111,47 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create)
 		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
 
+	@SuppressWarnings("unchecked")
 	@Nonnull
 	@Override
-	public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+	public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
 		@Nonnull String stateName,
-		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
-		@Nonnull PriorityComparator<T> elementPriorityComparator,
-		@Nonnull KeyExtractorFunction<T> keyExtractor) {
+		@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+
+		final StateSnapshotRestore snapshotRestore = registeredStates.get(stateName);
+
+		if (snapshotRestore instanceof HeapPriorityQueueSnapshotRestoreWrapper) {
+			//TODO Serializer upgrade story!?
+			return ((HeapPriorityQueueSnapshotRestoreWrapper<T>) snapshotRestore).getPriorityQueue();
+		} else if (snapshotRestore != null) {
+			throw new IllegalStateException("Already found a different state type registered under this name: " + snapshotRestore.getClass());
+		}
+
+		final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
+			new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer);
 
-		return priorityQueueSetFactory.create(
+		return createInternal(metaInfo);
+	}
+
+	@Nonnull
+	private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> createInternal(
+		RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
+
+		final String stateName = metaInfo.getName();
+		final HeapPriorityQueueSet<T> priorityQueue = priorityQueueSetFactory.create(
 			stateName,
-			byteOrderedElementSerializer,
-			elementPriorityComparator,
-			keyExtractor);
+			metaInfo.getElementSerializer());
+
+		HeapPriorityQueueSnapshotRestoreWrapper<T> wrapper =
+			new HeapPriorityQueueSnapshotRestoreWrapper<>(
+				priorityQueue,
+				metaInfo,
+				KeyExtractorFunction.forKeyedObjects(),
+				keyGroupRange,
+				numberOfKeyGroups);
+
+		registeredStates.put(stateName, wrapper);
+		return priorityQueue;
 	}
 
 	private interface StateFactory {
@@ -131,14 +162,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	/**
-	 * Map of state tables that stores all state of key/value states. We store it centrally so
-	 * that we can easily checkpoint/restore it.
-	 *
-	 * <p>The actual parameters of StateTable are {@code StateTable<NamespaceT, Map<KeyT, StateT>>}
-	 * but we can't put them here because different key/value states with different types and
-	 * namespace types share this central list of tables.
+	 * Map of registered states for snapshot/restore.
 	 */
-	private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+	private final Map<String, StateSnapshotRestore> registeredStates = new HashMap<>();
 
 	/**
 	 * Map of state names to their corresponding restored state meta info.
@@ -161,7 +187,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/**
 	 * Factory for state that is organized as priority queue.
 	 */
-	private final PriorityQueueSetFactory priorityQueueSetFactory;
+	private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
 
 	public HeapKeyedStateBackend(
 			TaskKvStateRegistry kvStateRegistry,
@@ -172,7 +198,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			boolean asynchronousSnapshots,
 			ExecutionConfig executionConfig,
 			LocalRecoveryConfig localRecoveryConfig,
-			PriorityQueueSetFactory priorityQueueSetFactory,
+			HeapPriorityQueueSetFactory priorityQueueSetFactory,
 			TtlTimeProvider ttlTimeProvider) {
 
 		super(kvStateRegistry, keySerializer, userCodeClassLoader,
@@ -197,9 +223,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) throws StateMigrationException {
 
 		@SuppressWarnings("unchecked")
-		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateDesc.getName());
+		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredStates.get(stateDesc.getName());
 
-		RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo;
+		RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo;
 		if (stateTable != null) {
 			@SuppressWarnings("unchecked")
 			StateMetaInfoSnapshot restoredMetaInfoSnapshot =
@@ -210,37 +236,43 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
 					" but its corresponding restored snapshot cannot be found.");
 
-			newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
+			newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
 				restoredMetaInfoSnapshot,
 				namespaceSerializer,
 				stateDesc);
 
 			stateTable.setMetaInfo(newMetaInfo);
 		} else {
-			newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+			newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
 				stateDesc.getType(),
 				stateDesc.getName(),
 				namespaceSerializer,
 				stateDesc.getSerializer());
 
 			stateTable = snapshotStrategy.newStateTable(newMetaInfo);
-			stateTables.put(stateDesc.getName(), stateTable);
+			registeredStates.put(stateDesc.getName(), stateTable);
 		}
 
 		return stateTable;
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public <N> Stream<K> getKeys(String state, N namespace) {
-		if (!stateTables.containsKey(state)) {
+		if (!registeredStates.containsKey(state)) {
 			return Stream.empty();
 		}
-		StateTable<K, N, ?> table = (StateTable<K, N, ?>) stateTables.get(state);
+
+		final StateSnapshotRestore stateSnapshotRestore = registeredStates.get(state);
+		if (!(stateSnapshotRestore instanceof StateTable)) {
+			return Stream.empty();
+		}
+		StateTable<K, N, ?> table = (StateTable<K, N, ?>) stateSnapshotRestore;
 		return table.getKeys(namespace);
 	}
 
 	private boolean hasRegisteredState() {
-		return !stateTables.isEmpty();
+		return !registeredStates.isEmpty();
 	}
 
 	@Override
@@ -288,7 +320,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		final Map<Integer, String> kvStatesById = new HashMap<>();
 		int numRegisteredKvStates = 0;
-		stateTables.clear();
+		registeredStates.clear();
 
 		boolean keySerializerRestored = false;
 
@@ -342,16 +374,20 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
 					restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
 
-					StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName());
+					StateSnapshotRestore snapshotRestore = registeredStates.get(restoredMetaInfo.getName());
 
 					//important: only create a new table we did not already create it previously
-					if (null == stateTable) {
+					if (null == snapshotRestore) {
 
-						RegisteredKeyedBackendStateMetaInfo<?, ?> registeredKeyedBackendStateMetaInfo =
-								new RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
+						if (restoredMetaInfo.getBackendStateType() == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE) {
+							RegisteredKeyValueStateBackendMetaInfo<?, ?> registeredKeyedBackendStateMetaInfo =
+								new RegisteredKeyValueStateBackendMetaInfo<>(restoredMetaInfo);
 
-						stateTable = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
-						stateTables.put(restoredMetaInfo.getName(), stateTable);
+							snapshotRestore = snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo);
+							registeredStates.put(restoredMetaInfo.getName(), snapshotRestore);
+						} else {
+							createInternal(new RegisteredPriorityQueueStateBackendMetaInfo<>(restoredMetaInfo));
+						}
 						kvStatesById.put(numRegisteredKvStates, restoredMetaInfo.getName());
 						++numRegisteredKvStates;
 					} else {
@@ -384,12 +420,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 						for (int i = 0; i < restoredMetaInfos.size(); i++) {
 							int kvStateId = kgCompressionInView.readShort();
-							StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
+							StateSnapshotRestore registeredState = registeredStates.get(kvStatesById.get(kvStateId));
 
-							StateTableByKeyGroupReader keyGroupReader =
-								StateTableByKeyGroupReaders.readerForVersion(
-									stateTable,
-									serializationProxy.getReadVersion());
+							StateSnapshotKeyGroupReader keyGroupReader =
+								registeredState.keyGroupReader(serializationProxy.getReadVersion());
 
 							keyGroupReader.readMappingsInKeyGroup(kgCompressionInView, keyGroupIndex);
 						}
@@ -446,8 +480,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	@Override
 	public int numStateEntries() {
 		int sum = 0;
-		for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
-			sum += stateTable.size();
+		for (StateSnapshotRestore state : registeredStates.values()) {
+			if (state instanceof StateTable) {
+				sum += ((StateTable<?, ?, ?>) state).size();
+			}
 		}
 		return sum;
 	}
@@ -458,8 +494,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	@VisibleForTesting
 	public int numStateEntries(Object namespace) {
 		int sum = 0;
-		for (StateTable<K, ?, ?> stateTable : stateTables.values()) {
-			sum += stateTable.sizeOfNamespace(namespace);
+		for (StateSnapshotRestore state : registeredStates.values()) {
+			if (state instanceof StateTable) {
+				sum += ((StateTable<?, ?, ?>) state).sizeOfNamespace(namespace);
+			}
 		}
 		return sum;
 	}
@@ -486,7 +524,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		boolean isAsynchronous();
 
-		<N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo);
+		<N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo);
 	}
 
 	private class AsyncSnapshotStrategySynchronicityBehavior implements SnapshotStrategySynchronicityBehavior<K> {
@@ -503,7 +541,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		@Override
-		public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
+		public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo) {
 			return new CopyOnWriteStateTable<>(HeapKeyedStateBackend.this, newMetaInfo);
 		}
 	}
@@ -522,7 +560,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		@Override
-		public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
+		public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo) {
 			return new NestedMapsStateTable<>(HeapKeyedStateBackend.this, newMetaInfo);
 		}
 	}
@@ -554,25 +592,26 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			long syncStartTime = System.currentTimeMillis();
 
-			Preconditions.checkState(stateTables.size() <= Short.MAX_VALUE,
-				"Too many KV-States: " + stateTables.size() +
+			Preconditions.checkState(registeredStates.size() <= Short.MAX_VALUE,
+				"Too many KV-States: " + registeredStates.size() +
 					". Currently at most " + Short.MAX_VALUE + " states are supported");
 
 			List<StateMetaInfoSnapshot> metaInfoSnapshots =
-				new ArrayList<>(stateTables.size());
+				new ArrayList<>(registeredStates.size());
 
-			final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
+			final Map<String, Integer> kVStateToId = new HashMap<>(registeredStates.size());
 
 			final Map<String, StateSnapshot> cowStateStableSnapshots =
-				new HashMap<>(stateTables.size());
+				new HashMap<>(registeredStates.size());
 
-			for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
+			for (Map.Entry<String, StateSnapshotRestore> kvState : registeredStates.entrySet()) {
 				String stateName = kvState.getKey();
 				kVStateToId.put(stateName, kVStateToId.size());
-				StateTable<K, ?, ?> stateTable = kvState.getValue();
-				if (null != stateTable) {
-					metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
-					cowStateStableSnapshots.put(stateName, stateTable.createSnapshot());
+				StateSnapshotRestore state = kvState.getValue();
+				if (null != state) {
+					final StateSnapshot stateSnapshot = state.stateSnapshot();
+					metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
+					cowStateStableSnapshots.put(stateName, stateSnapshot);
 				}
 			}
 
@@ -654,13 +693,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 							outView.writeInt(keyGroupId);
 
 							for (Map.Entry<String, StateSnapshot> kvState : cowStateStableSnapshots.entrySet()) {
-								StateSnapshot.KeyGroupPartitionedSnapshot partitionedSnapshot =
-									kvState.getValue().partitionByKeyGroup();
+								StateSnapshot.StateKeyGroupWriter partitionedSnapshot =
+									kvState.getValue().getKeyGroupWriter();
 								try (OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(localStream)) {
 									String stateName = kvState.getKey();
 									DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
 									kgCompressionView.writeShort(kVStateToId.get(stateName));
-									partitionedSnapshot.writeMappingsInKeyGroup(kgCompressionView, keyGroupId);
+									partitionedSnapshot.writeStateInKeyGroup(kgCompressionView, keyGroupId);
 								} // this will just close the outer compression stream
 							}
 						}
@@ -705,7 +744,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		@Override
-		public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
+		public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo) {
 			return snapshotStrategySynchronicityTrait.newStateTable(newMetaInfo);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
index e5f610e..22b2419 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
@@ -50,7 +50,8 @@ import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
  *
  * @param <T> type of the contained elements.
  */
-public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
+public class HeapPriorityQueue<T extends HeapPriorityQueueElement>
+	implements InternalPriorityQueue<T> {
 
 	/**
 	 * The index of the head element in the array that represents the heap.

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
index ee6fda9..b0255d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.PriorityComparator;
 import org.apache.flink.runtime.state.PriorityQueueSetFactory;
 
@@ -29,7 +30,7 @@ import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 
 /**
- *
+ * Factory for {@link HeapPriorityQueueSet}.
  */
 public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {
 
@@ -54,14 +55,13 @@ public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {
 
 	@Nonnull
 	@Override
-	public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+	public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create(
 		@Nonnull String stateName,
-		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
-		@Nonnull PriorityComparator<T> elementPriorityComparator,
-		@Nonnull KeyExtractorFunction<T> keyExtractor) {
-		return new HeapPriorityQueueSet<>(
-			elementPriorityComparator,
-			keyExtractor,
+		@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+
+		return new HeapPriorityQueueSet<T>(
+			PriorityComparator.forPriorityComparableObjects(),
+			KeyExtractorFunction.forKeyedObjects(),
 			minimumCapacity,
 			keyGroupRange,
 			totalKeyGroups);

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
new file mode 100644
index 0000000..5fd67f0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupPartitioner;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * This wrapper combines a HeapPriorityQueue with backend meta data.
+ *
+ * @param <T> type of the queue elements.
+ */
+public class HeapPriorityQueueSnapshotRestoreWrapper<T extends HeapPriorityQueueElement>
+	implements StateSnapshotRestore {
+
+	@Nonnull
+	private final HeapPriorityQueueSet<T> priorityQueue;
+	@Nonnull
+	private final KeyExtractorFunction<T> keyExtractorFunction;
+	@Nonnull
+	private final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo;
+	@Nonnull
+	private final KeyGroupRange localKeyGroupRange;
+	@Nonnegative
+	private final int totalKeyGroups;
+
+	public HeapPriorityQueueSnapshotRestoreWrapper(
+		@Nonnull HeapPriorityQueueSet<T> priorityQueue,
+		@Nonnull RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo,
+		@Nonnull KeyExtractorFunction<T> keyExtractorFunction,
+		@Nonnull KeyGroupRange localKeyGroupRange,
+		int totalKeyGroups) {
+
+		this.priorityQueue = priorityQueue;
+		this.keyExtractorFunction = keyExtractorFunction;
+		this.metaInfo = metaInfo;
+		this.localKeyGroupRange = localKeyGroupRange;
+		this.totalKeyGroups = totalKeyGroups;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Nonnull
+	@Override
+	public StateSnapshot stateSnapshot() {
+		final T[] queueDump = (T[]) priorityQueue.toArray(new HeapPriorityQueueElement[priorityQueue.size()]);
+
+		final TypeSerializer<T> elementSerializer = metaInfo.getElementSerializer();
+
+		// turn the flat copy into a deep copy if required.
+		if (!elementSerializer.isImmutableType()) {
+			for (int i = 0; i < queueDump.length; ++i) {
+				queueDump[i] = elementSerializer.copy(queueDump[i]);
+			}
+		}
+
+		return new HeapPriorityQueueStateSnapshot<>(
+			queueDump,
+			keyExtractorFunction,
+			metaInfo.deepCopy(),
+			localKeyGroupRange,
+			totalKeyGroups);
+	}
+
+	@Nonnull
+	@Override
+	public StateSnapshotKeyGroupReader keyGroupReader(int readVersionHint) {
+		final TypeSerializer<T> elementSerializer = metaInfo.getElementSerializer();
+		return KeyGroupPartitioner.createKeyGroupPartitionReader(
+			elementSerializer::deserialize, //we know that this does not deliver nulls, because we never write nulls
+			(element, keyGroupId) -> priorityQueue.add(element));
+	}
+
+	@Nonnull
+	public HeapPriorityQueueSet<T> getPriorityQueue() {
+		return priorityQueue;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java
new file mode 100644
index 0000000..18e7d54
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueStateSnapshot.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupPartitioner;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Array;
+
+/**
+ * This class represents the snapshot of an {@link HeapPriorityQueueSet}.
+ *
+ * @param <T> type of the state elements.
+ */
+public class HeapPriorityQueueStateSnapshot<T> implements StateSnapshot {
+
+	/** Function that extracts keys from elements. */
+	@Nonnull
+	private final KeyExtractorFunction<T> keyExtractor;
+
+	/** Copy of the heap array containing all the (immutable or deeply copied) elements. */
+	@Nonnull
+	private final T[] heapArrayCopy;
+
+	/** The meta info of the state. */
+	@Nonnull
+	private final RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo;
+
+	/** The key-group range covered by this snapshot. */
+	@Nonnull
+	private final KeyGroupRange keyGroupRange;
+
+	/** The total number of key-groups in the job. */
+	@Nonnegative
+	private final int totalKeyGroups;
+
+	/** Result of partitioning the snapshot by key-group. */
+	@Nullable
+	private StateKeyGroupWriter stateKeyGroupWriter;
+
+	HeapPriorityQueueStateSnapshot(
+		@Nonnull T[] heapArrayCopy,
+		@Nonnull KeyExtractorFunction<T> keyExtractor,
+		@Nonnull RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo,
+		@Nonnull KeyGroupRange keyGroupRange,
+		@Nonnegative int totalKeyGroups) {
+
+		this.keyExtractor = keyExtractor;
+		this.heapArrayCopy = heapArrayCopy;
+		this.metaInfo = metaInfo;
+		this.keyGroupRange = keyGroupRange;
+		this.totalKeyGroups = totalKeyGroups;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Nonnull
+	@Override
+	public StateKeyGroupWriter getKeyGroupWriter() {
+
+		if (stateKeyGroupWriter == null) {
+
+			T[] partitioningOutput = (T[]) Array.newInstance(
+				heapArrayCopy.getClass().getComponentType(),
+				heapArrayCopy.length);
+
+			final TypeSerializer<T> elementSerializer = metaInfo.getElementSerializer();
+
+			KeyGroupPartitioner<T> keyGroupPartitioner =
+				new KeyGroupPartitioner<>(
+					heapArrayCopy,
+					heapArrayCopy.length,
+					partitioningOutput,
+					keyGroupRange,
+					totalKeyGroups,
+					keyExtractor,
+					elementSerializer::serialize);
+
+			stateKeyGroupWriter = keyGroupPartitioner.partitionByKeyGroup();
+		}
+
+		return stateKeyGroupWriter;
+	}
+
+	@Nonnull
+	@Override
+	public StateMetaInfoSnapshot getMetaInfoSnapshot() {
+		return metaInfo.snapshot();
+	}
+
+	@Override
+	public void release() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
index 6f4f911..d8b0a5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
@@ -49,6 +49,8 @@ import java.util.function.Predicate;
 public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
 	implements InternalPriorityQueue<T>, KeyGroupedInternalPriorityQueue<T> {
 
+	static final boolean ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION = false;
+
 	/** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
 	@Nonnull
 	private final HeapPriorityQueue<PQ> heapOfkeyGroupedHeaps;
@@ -94,6 +96,22 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu
 
 	@Override
 	public void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
+		if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
+			bulkPollRelaxedOrder(canConsume, consumer);
+		} else {
+			bulkPollStrictOrder(canConsume, consumer);
+		}
+	}
+
+	private void bulkPollRelaxedOrder(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
+		PQ headList = heapOfkeyGroupedHeaps.peek();
+		while (headList.peek() != null && canConsume.test(headList.peek())) {
+			headList.bulkPoll(canConsume, consumer);
+			heapOfkeyGroupedHeaps.adjustModifiedElement(headList);
+		}
+	}
+
+	private void bulkPollStrictOrder(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
 		T element;
 		while ((element = peek()) != null && canConsume.test(element)) {
 			poll();

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
index 18551b5..efed1cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -22,9 +22,10 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshot;
 import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -34,6 +35,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Stream;
 
 /**
@@ -69,7 +71,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
 	 * @param keyContext the key context.
 	 * @param metaInfo the meta information for this state table.
 	 */
-	public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
+	public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
 		super(keyContext, metaInfo);
 		this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();
 
@@ -175,7 +177,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
 	@Override
 	public Stream<K> getKeys(N namespace) {
 		return Arrays.stream(state)
-			.filter(namespaces -> namespaces != null)
+			.filter(Objects::nonNull)
 			.map(namespaces -> namespaces.getOrDefault(namespace, Collections.emptyMap()))
 			.flatMap(namespaceSate -> namespaceSate.keySet().stream());
 	}
@@ -232,12 +234,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
 			setMapForKeyGroup(keyGroupIndex, namespaceMap);
 		}
 
-		Map<K, S> keyedMap = namespaceMap.get(namespace);
-
-		if (keyedMap == null) {
-			keyedMap = new HashMap<>();
-			namespaceMap.put(namespace, keyedMap);
-		}
+		Map<K, S> keyedMap = namespaceMap.computeIfAbsent(namespace, k -> new HashMap<>());
 
 		return keyedMap.put(key, value);
 	}
@@ -302,13 +299,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
 			setMapForKeyGroup(keyGroupIndex, namespaceMap);
 		}
 
-		Map<K, S> keyedMap = namespaceMap.get(namespace);
-
-		if (keyedMap == null) {
-			keyedMap = new HashMap<>();
-			namespaceMap.put(namespace, keyedMap);
-		}
-
+		Map<K, S> keyedMap = namespaceMap.computeIfAbsent(namespace, k -> new HashMap<>());
 		keyedMap.put(key, transformation.apply(keyedMap.get(key), value));
 	}
 
@@ -323,8 +314,9 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
 		return count;
 	}
 
+	@Nonnull
 	@Override
-	public NestedMapsStateTableSnapshot<K, N, S> createSnapshot() {
+	public NestedMapsStateTableSnapshot<K, N, S> stateSnapshot() {
 		return new NestedMapsStateTableSnapshot<>(this);
 	}
 
@@ -337,7 +329,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
 	 */
 	static class NestedMapsStateTableSnapshot<K, N, S>
 			extends AbstractStateTableSnapshot<K, N, S, NestedMapsStateTable<K, N, S>>
-			implements StateSnapshot.KeyGroupPartitionedSnapshot {
+			implements StateSnapshot.StateKeyGroupWriter {
 
 		NestedMapsStateTableSnapshot(NestedMapsStateTable<K, N, S> owningTable) {
 			super(owningTable);
@@ -345,10 +337,16 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
 
 		@Nonnull
 		@Override
-		public KeyGroupPartitionedSnapshot partitionByKeyGroup() {
+		public StateKeyGroupWriter getKeyGroupWriter() {
 			return this;
 		}
 
+		@Nonnull
+		@Override
+		public StateMetaInfoSnapshot getMetaInfoSnapshot() {
+			return owningStateTable.metaInfo.snapshot();
+		}
+
 		/**
 		 * Implementation note: we currently chose the same format between {@link NestedMapsStateTable} and
 		 * {@link CopyOnWriteStateTable}.
@@ -359,7 +357,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
 		 * implementations).
 		 */
 		@Override
-		public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
+		public void writeStateInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
 			final Map<N, Map<K, S>> keyGroupMap = owningStateTable.getMapForKeyGroup(keyGroupId);
 			if (null != keyGroupMap) {
 				TypeSerializer<K> keySerializer = owningStateTable.keyContext.getKeySerializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index de2290a..58a2f97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
-import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.util.stream.Stream;
 
 /**
@@ -35,7 +38,7 @@ import java.util.stream.Stream;
  * @param <N> type of namespace
  * @param <S> type of state
  */
-public abstract class StateTable<K, N, S> {
+public abstract class StateTable<K, N, S> implements StateSnapshotRestore {
 
 	/**
 	 * The key context view on the backend. This provides information, such as the currently active key.
@@ -45,14 +48,14 @@ public abstract class StateTable<K, N, S> {
 	/**
 	 * Combined meta information such as name and serializers for this state
 	 */
-	protected RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo;
+	protected RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo;
 
 	/**
 	 *
 	 * @param keyContext the key context provides the key scope for all put/get/delete operations.
 	 * @param metaInfo the meta information, including the type serializer for state copy-on-write.
 	 */
-	public StateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
+	public StateTable(InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
 		this.keyContext = Preconditions.checkNotNull(keyContext);
 		this.metaInfo = Preconditions.checkNotNull(metaInfo);
 	}
@@ -173,22 +176,26 @@ public abstract class StateTable<K, N, S> {
 		return metaInfo.getNamespaceSerializer();
 	}
 
-	public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() {
+	public RegisteredKeyValueStateBackendMetaInfo<N, S> getMetaInfo() {
 		return metaInfo;
 	}
 
-	public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
+	public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
 		this.metaInfo = metaInfo;
 	}
 
 	// Snapshot / Restore -------------------------------------------------------------------------
 
-	abstract StateSnapshot createSnapshot();
-
 	public abstract void put(K key, int keyGroup, N namespace, S state);
 
 	// For testing --------------------------------------------------------------------------------
 
 	@VisibleForTesting
 	public abstract int sizeOfNamespace(Object namespace);
+
+	@Nonnull
+	@Override
+	public StateSnapshotKeyGroupReader keyGroupReader(int readVersion) {
+		return StateTableByKeyGroupReaders.readerForVersion(this, readVersion);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
deleted file mode 100644
index 659c174..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReader.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.core.memory.DataInputView;
-
-import java.io.IOException;
-
-/**
- * Interface for state de-serialization into {@link StateTable}s by key-group.
- */
-interface StateTableByKeyGroupReader {
-
-	/**
-	 * Read the data for the specified key-group from the input.
-	 *
-	 * @param div        the input
-	 * @param keyGroupId the key-group to write
-	 * @throws IOException on write related problems
-	 */
-	void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
index e08e90e..2f83857 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -19,12 +19,18 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.KeyGroupPartitioner;
+import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
 
 import java.io.IOException;
 
 /**
- * This class provides a static factory method to create different implementations of {@link StateTableByKeyGroupReader}
+ * This class provides a static factory method to create different implementations of {@link StateSnapshotKeyGroupReader}
  * depending on the provided serialization format version.
  * <p>
  * The implementations are also located here as inner classes.
@@ -35,69 +41,58 @@ class StateTableByKeyGroupReaders {
 	 * Creates a new StateTableByKeyGroupReader that inserts de-serialized mappings into the given table, using the
 	 * de-serialization algorithm that matches the given version.
 	 *
-	 * @param table the {@link StateTable} into which de-serialized mappings are inserted.
+	 * @param stateTable the {@link StateTable} into which de-serialized mappings are inserted.
 	 * @param version version for the de-serialization algorithm.
 	 * @param <K> type of key.
 	 * @param <N> type of namespace.
 	 * @param <S> type of state.
 	 * @return the appropriate reader.
 	 */
-	static <K, N, S> StateTableByKeyGroupReader readerForVersion(StateTable<K, N, S> table, int version) {
+	static <K, N, S> StateSnapshotKeyGroupReader readerForVersion(StateTable<K, N, S> stateTable, int version) {
 		switch (version) {
 			case 1:
-				return new StateTableByKeyGroupReaderV1<>(table);
+				return new StateTableByKeyGroupReaderV1<>(stateTable);
 			case 2:
 			case 3:
 			case 4:
 			case 5:
-				return new StateTableByKeyGroupReaderV2V3<>(table);
+				return createV2PlusReader(stateTable);
 			default:
 				throw new IllegalArgumentException("Unknown version: " + version);
 		}
 	}
 
-	static abstract class AbstractStateTableByKeyGroupReader<K, N, S>
-			implements StateTableByKeyGroupReader {
-
-		protected final StateTable<K, N, S> stateTable;
-
-		AbstractStateTableByKeyGroupReader(StateTable<K, N, S> stateTable) {
-			this.stateTable = stateTable;
-		}
-
-		@Override
-		public abstract void readMappingsInKeyGroup(DataInputView div, int keyGroupId) throws IOException;
-
-		protected TypeSerializer<K> getKeySerializer() {
-			return stateTable.keyContext.getKeySerializer();
-		}
-
-		protected TypeSerializer<N> getNamespaceSerializer() {
-			return stateTable.getNamespaceSerializer();
-		}
-
-		protected TypeSerializer<S> getStateSerializer() {
-			return stateTable.getStateSerializer();
-		}
+	private static <K, N, S> StateSnapshotKeyGroupReader createV2PlusReader(StateTable<K, N, S> stateTable) {
+		final TypeSerializer<K> keySerializer = stateTable.keyContext.getKeySerializer();
+		final TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
+		final TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
+		final Tuple3<N, K, S> buffer = new Tuple3<>();
+		return KeyGroupPartitioner.createKeyGroupPartitionReader((in) -> {
+			buffer.f0 = namespaceSerializer.deserialize(in);
+			buffer.f1 = keySerializer.deserialize(in);
+			buffer.f2 = stateSerializer.deserialize(in);
+			return buffer;
+		}, (element, keyGroupId1) -> stateTable.put(element.f1, keyGroupId1, element.f0, element.f2));
 	}
 
-	static final class StateTableByKeyGroupReaderV1<K, N, S>
-			extends AbstractStateTableByKeyGroupReader<K, N, S> {
+	static final class StateTableByKeyGroupReaderV1<K, N, S> implements StateSnapshotKeyGroupReader {
+
+		protected final StateTable<K, N, S> stateTable;
 
 		StateTableByKeyGroupReaderV1(StateTable<K, N, S> stateTable) {
-			super(stateTable);
+			this.stateTable = stateTable;
 		}
 
 		@Override
-		public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
+		public void readMappingsInKeyGroup(@Nonnull DataInputView inView, @Nonnegative int keyGroupId) throws IOException {
 
 			if (inView.readByte() == 0) {
 				return;
 			}
 
-			final TypeSerializer<K> keySerializer = getKeySerializer();
-			final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
-			final TypeSerializer<S> stateSerializer = getStateSerializer();
+			final TypeSerializer<K> keySerializer = stateTable.keyContext.getKeySerializer();
+			final TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
+			final TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
 
 			// V1 uses kind of namespace compressing format
 			int numNamespaces = inView.readInt();
@@ -112,28 +107,4 @@ class StateTableByKeyGroupReaders {
 			}
 		}
 	}
-
-	private static final class StateTableByKeyGroupReaderV2V3<K, N, S>
-			extends AbstractStateTableByKeyGroupReader<K, N, S> {
-
-		StateTableByKeyGroupReaderV2V3(StateTable<K, N, S> stateTable) {
-			super(stateTable);
-		}
-
-		@Override
-		public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
-
-			final TypeSerializer<K> keySerializer = getKeySerializer();
-			final TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
-			final TypeSerializer<S> stateSerializer = getStateSerializer();
-
-			int numKeys = inView.readInt();
-			for (int i = 0; i < numKeys; ++i) {
-				N namespace = namespaceSerializer.deserialize(inView);
-				K key = keySerializer.deserialize(inView);
-				S state = stateSerializer.deserialize(inView);
-				stateTable.put(key, keyGroupId, namespace, state);
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
index 9341a5a..5a3190c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.metainfo;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -30,7 +31,7 @@ import java.util.Map;
 
 /**
  * Generalized snapshot for meta information about one state in a state backend
- * (e.g. {@link org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo}).
+ * (e.g. {@link RegisteredKeyValueStateBackendMetaInfo}).
  */
 public class StateMetaInfoSnapshot {
 
@@ -41,7 +42,7 @@ public class StateMetaInfoSnapshot {
 		KEY_VALUE,
 		OPERATOR,
 		BROADCAST,
-		TIMER
+		PRIORITY_QUEUE
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
index ce535ef..926e75f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
@@ -39,6 +39,8 @@ import java.util.Map;
  */
 public class StateMetaInfoSnapshotReadersWriters {
 
+	private StateMetaInfoSnapshotReadersWriters() {}
+
 	/**
 	 * Current version for the serialization format of {@link StateMetaInfoSnapshotReadersWriters}.
 	 * - v5: Flink 1.6.x
@@ -74,23 +76,35 @@ public class StateMetaInfoSnapshotReadersWriters {
 	@Nonnull
 	public static StateMetaInfoReader getReader(int readVersion, @Nonnull StateTypeHint stateTypeHint) {
 
+		if (readVersion < CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+			switch (stateTypeHint) {
+				case KEYED_STATE:
+					return getLegacyKeyedStateMetaInfoReader(readVersion);
+				case OPERATOR_STATE:
+					return getLegacyOperatorStateMetaInfoReader(readVersion);
+				default:
+					throw new IllegalArgumentException("Unsupported state type hint: " + stateTypeHint +
+						" with version " + readVersion);
+			}
+		} else {
+			return getReader(readVersion);
+		}
+	}
+
+	/**
+	 * Returns a reader for {@link StateMetaInfoSnapshot} with the requested state type and version number.
+	 *
+	 * @param readVersion the format version to read.
+	 * @return the requested reader.
+	 */
+	@Nonnull
+	public static StateMetaInfoReader getReader(int readVersion) {
 		if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
 			// latest version shortcut
 			return CurrentReaderImpl.INSTANCE;
-		}
-
-		if (readVersion > CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+		} else {
 			throw new IllegalArgumentException("Unsupported read version for state meta info: " + readVersion);
 		}
-
-		switch (stateTypeHint) {
-			case KEYED_STATE:
-				return getLegacyKeyedStateMetaInfoReader(readVersion);
-			case OPERATOR_STATE:
-				return getLegacyOperatorStateMetaInfoReader(readVersion);
-			default:
-				throw new IllegalArgumentException("Unsupported state type hint: " + stateTypeHint);
-		}
 	}
 
 	@Nonnull

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
index e6b7739..3756187 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
@@ -85,11 +85,11 @@ public abstract class KeyGroupPartitionerTestBase<T> extends TestLogger {
 			new ValidatingElementWriterDummy<>(keyExtractorFunction, numberOfKeyGroups, allElementsIdentitySet);
 
 		final KeyGroupPartitioner<T> testInstance = createPartitioner(data, testSize, range, numberOfKeyGroups, validatingElementWriter);
-		final StateSnapshot.KeyGroupPartitionedSnapshot result = testInstance.partitionByKeyGroup();
+		final StateSnapshot.StateKeyGroupWriter result = testInstance.partitionByKeyGroup();
 
 		for (int keyGroup = 0; keyGroup < numberOfKeyGroups; ++keyGroup) {
 			validatingElementWriter.setCurrentKeyGroup(keyGroup);
-			result.writeMappingsInKeyGroup(DUMMY_OUT_VIEW, keyGroup);
+			result.writeStateInKeyGroup(DUMMY_OUT_VIEW, keyGroup);
 		}
 
 		validatingElementWriter.validateAllElementsSeen();

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 5241dd8..9c487a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -55,11 +55,11 @@ public class SerializationProxiesTest {
 
 		List<StateMetaInfoSnapshot> stateMetaInfoList = new ArrayList<>();
 
-		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+		stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
 			StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer).snapshot());
-		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+		stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
 			StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer).snapshot());
-		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+		stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
 			StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
 
 		KeyedBackendSerializationProxy<?> serializationProxy =
@@ -93,11 +93,11 @@ public class SerializationProxiesTest {
 
 		List<StateMetaInfoSnapshot> stateMetaInfoList = new ArrayList<>();
 
-		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+		stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
 			StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer).snapshot());
-		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+		stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
 			StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer).snapshot());
-		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+		stateMetaInfoList.add(new RegisteredKeyValueStateBackendMetaInfo<>(
 			StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
 
 		KeyedBackendSerializationProxy<?> serializationProxy =
@@ -132,7 +132,7 @@ public class SerializationProxiesTest {
 		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
 
 		for (StateMetaInfoSnapshot snapshot : serializationProxy.getStateMetaInfoSnapshots()) {
-			final RegisteredKeyedBackendStateMetaInfo<?, ?> restoredMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(snapshot);
+			final RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
 			Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
 			Assert.assertTrue(restoredMetaInfo.getStateSerializer() instanceof UnloadableDummyTypeSerializer);
 			Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
@@ -147,7 +147,7 @@ public class SerializationProxiesTest {
 		TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
 		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-		StateMetaInfoSnapshot metaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+		StateMetaInfoSnapshot metaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
 			StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();
 
 		byte[] serialized;
@@ -173,7 +173,7 @@ public class SerializationProxiesTest {
 		TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
 		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
-		StateMetaInfoSnapshot snapshot = new RegisteredKeyedBackendStateMetaInfo<>(
+		StateMetaInfoSnapshot snapshot = new RegisteredKeyValueStateBackendMetaInfo<>(
 			StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();
 
 		byte[] serialized;
@@ -198,7 +198,7 @@ public class SerializationProxiesTest {
 				new DataInputViewStreamWrapper(in), classLoader);
 		}
 
-		RegisteredKeyedBackendStateMetaInfo<?, ?> restoredMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(snapshot);
+		RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
 
 		Assert.assertEquals(name, restoredMetaInfo.getName());
 		Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
@@ -216,18 +216,18 @@ public class SerializationProxiesTest {
 
 		List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>();
 
-		stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+		stateMetaInfoSnapshots.add(new RegisteredOperatorStateBackendMetaInfo<>(
 			"a", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot());
-		stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+		stateMetaInfoSnapshots.add(new RegisteredOperatorStateBackendMetaInfo<>(
 			"b", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot());
-		stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+		stateMetaInfoSnapshots.add(new RegisteredOperatorStateBackendMetaInfo<>(
 			"c", stateSerializer, OperatorStateHandle.Mode.UNION).snapshot());
 
 		List<StateMetaInfoSnapshot> broadcastStateMetaInfoSnapshots = new ArrayList<>();
 
-		broadcastStateMetaInfoSnapshots.add(new RegisteredBroadcastBackendStateMetaInfo<>(
+		broadcastStateMetaInfoSnapshots.add(new RegisteredBroadcastStateBackendMetaInfo<>(
 				"d", OperatorStateHandle.Mode.BROADCAST, keySerializer, valueSerializer).snapshot());
-		broadcastStateMetaInfoSnapshots.add(new RegisteredBroadcastBackendStateMetaInfo<>(
+		broadcastStateMetaInfoSnapshots.add(new RegisteredBroadcastStateBackendMetaInfo<>(
 				"e", OperatorStateHandle.Mode.BROADCAST, valueSerializer, keySerializer).snapshot());
 
 		OperatorBackendSerializationProxy serializationProxy =
@@ -257,7 +257,7 @@ public class SerializationProxiesTest {
 		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
 		StateMetaInfoSnapshot snapshot =
-			new RegisteredOperatorBackendStateMetaInfo<>(
+			new RegisteredOperatorStateBackendMetaInfo<>(
 				name, stateSerializer, OperatorStateHandle.Mode.UNION).snapshot();
 
 		byte[] serialized;
@@ -274,8 +274,8 @@ public class SerializationProxiesTest {
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}
 
-		RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
-			new RegisteredOperatorBackendStateMetaInfo<>(snapshot);
+		RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
+			new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
 
 		Assert.assertEquals(name, restoredMetaInfo.getName());
 		Assert.assertEquals(OperatorStateHandle.Mode.UNION, restoredMetaInfo.getAssignmentMode());
@@ -290,7 +290,7 @@ public class SerializationProxiesTest {
 		TypeSerializer<?> valueSerializer = StringSerializer.INSTANCE;
 
 		StateMetaInfoSnapshot snapshot =
-			new RegisteredBroadcastBackendStateMetaInfo<>(
+			new RegisteredBroadcastStateBackendMetaInfo<>(
 				name, OperatorStateHandle.Mode.BROADCAST, keySerializer, valueSerializer).snapshot();
 
 		byte[] serialized;
@@ -308,8 +308,8 @@ public class SerializationProxiesTest {
 				new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
 		}
 
-		RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
-			new RegisteredBroadcastBackendStateMetaInfo<>(snapshot);
+		RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
+			new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
 
 		Assert.assertEquals(name, restoredMetaInfo.getName());
 		Assert.assertEquals(
@@ -325,7 +325,7 @@ public class SerializationProxiesTest {
 		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
 		StateMetaInfoSnapshot snapshot =
-			new RegisteredOperatorBackendStateMetaInfo<>(
+			new RegisteredOperatorStateBackendMetaInfo<>(
 				name, stateSerializer, OperatorStateHandle.Mode.UNION).snapshot();
 
 		byte[] serialized;
@@ -348,8 +348,8 @@ public class SerializationProxiesTest {
 			snapshot = reader.readStateMetaInfoSnapshot(new DataInputViewStreamWrapper(in), classLoader);
 		}
 
-		RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
-			new RegisteredOperatorBackendStateMetaInfo<>(snapshot);
+		RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
+			new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
 
 		Assert.assertEquals(name, restoredMetaInfo.getName());
 		Assert.assertTrue(restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer);
@@ -365,7 +365,7 @@ public class SerializationProxiesTest {
 		TypeSerializer<?> valueSerializer = StringSerializer.INSTANCE;
 
 		StateMetaInfoSnapshot snapshot =
-			new RegisteredBroadcastBackendStateMetaInfo<>(
+			new RegisteredBroadcastStateBackendMetaInfo<>(
 				broadcastName, OperatorStateHandle.Mode.BROADCAST, keySerializer, valueSerializer).snapshot();
 
 		byte[] serialized;
@@ -393,8 +393,8 @@ public class SerializationProxiesTest {
 			snapshot = reader.readStateMetaInfoSnapshot(new DataInputViewStreamWrapper(in), classLoader);
 		}
 
-		RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
-			new RegisteredBroadcastBackendStateMetaInfo<>(snapshot);
+		RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
+			new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
 
 		Assert.assertEquals(broadcastName, restoredMetaInfo.getName());
 		Assert.assertEquals(OperatorStateHandle.Mode.BROADCAST, restoredMetaInfo.getAssignmentMode());

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index 558f629..355387d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -55,7 +56,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 			true,
 			executionConfig,
 			TestLocalRecoveryConfig.disabled(),
-			mock(PriorityQueueSetFactory.class),
+			mock(HeapPriorityQueueSetFactory.class),
 			TtlTimeProvider.DEFAULT);
 
 		try {
@@ -79,7 +80,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 			true,
 			executionConfig,
 			TestLocalRecoveryConfig.disabled(),
-			mock(PriorityQueueSetFactory.class),
+			mock(HeapPriorityQueueSetFactory.class),
 			TtlTimeProvider.DEFAULT);
 
 		try {
@@ -121,7 +122,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 			true,
 			executionConfig,
 			TestLocalRecoveryConfig.disabled(),
-			mock(PriorityQueueSetFactory.class),
+			mock(HeapPriorityQueueSetFactory.class),
 			TtlTimeProvider.DEFAULT);
 
 		try {
@@ -164,7 +165,7 @@ public class StateSnapshotCompressionTest extends TestLogger {
 			true,
 			executionConfig,
 			TestLocalRecoveryConfig.disabled(),
-			mock(PriorityQueueSetFactory.class),
+			mock(HeapPriorityQueueSetFactory.class),
 			TtlTimeProvider.DEFAULT);
 		try {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index cf6bcc8..2c48e4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshot;
 import org.apache.flink.runtime.state.StateTransformationFunction;
 import org.apache.flink.util.TestLogger;
@@ -53,8 +53,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 	 */
 	@Test
 	public void testPutGetRemoveContainsTransform() throws Exception {
-		RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
-			new RegisteredKeyedBackendStateMetaInfo<>(
+		RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+			new RegisteredKeyValueStateBackendMetaInfo<>(
 				StateDescriptor.Type.UNKNOWN,
 				"test",
 				IntSerializer.INSTANCE,
@@ -125,8 +125,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 	 */
 	@Test
 	public void testIncrementalRehash() {
-		RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
-			new RegisteredKeyedBackendStateMetaInfo<>(
+		RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+			new RegisteredKeyValueStateBackendMetaInfo<>(
 				StateDescriptor.Type.UNKNOWN,
 				"test",
 				IntSerializer.INSTANCE,
@@ -170,8 +170,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 	@Test
 	public void testRandomModificationsAndCopyOnWriteIsolation() throws Exception {
 
-		final RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
-			new RegisteredKeyedBackendStateMetaInfo<>(
+		final RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+			new RegisteredKeyValueStateBackendMetaInfo<>(
 				StateDescriptor.Type.UNKNOWN,
 				"test",
 				IntSerializer.INSTANCE,
@@ -325,8 +325,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 	 */
 	@Test
 	public void testCopyOnWriteContracts() {
-		RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
-			new RegisteredKeyedBackendStateMetaInfo<>(
+		RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+			new RegisteredKeyValueStateBackendMetaInfo<>(
 				StateDescriptor.Type.UNKNOWN,
 				"test",
 				IntSerializer.INSTANCE,
@@ -356,7 +356,7 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 
 		// no snapshot taken, we get the original back
 		Assert.assertTrue(stateTable.get(1, 1) == originalState1);
-		CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot1 = stateTable.createSnapshot();
+		CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot1 = stateTable.stateSnapshot();
 		// after snapshot1 is taken, we get a copy...
 		final ArrayList<Integer> copyState = stateTable.get(1, 1);
 		Assert.assertFalse(copyState == originalState1);
@@ -370,7 +370,7 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 		Assert.assertTrue(copyState == stateTable.get(1, 1));
 
 		// we take snapshot2
-		CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot2 = stateTable.createSnapshot();
+		CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot2 = stateTable.stateSnapshot();
 		// after the second snapshot, copy-on-write is active again for old entries
 		Assert.assertFalse(copyState == stateTable.get(1, 1));
 		// and equality still holds
@@ -400,8 +400,8 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 		final TestDuplicateSerializer stateSerializer = new TestDuplicateSerializer();
 		final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();
 
-		RegisteredKeyedBackendStateMetaInfo<Integer, Integer> metaInfo =
-			new RegisteredKeyedBackendStateMetaInfo<>(
+		RegisteredKeyValueStateBackendMetaInfo<Integer, Integer> metaInfo =
+			new RegisteredKeyValueStateBackendMetaInfo<>(
 				StateDescriptor.Type.VALUE,
 				"test",
 				namespaceSerializer,
@@ -443,15 +443,15 @@ public class CopyOnWriteStateTableTest extends TestLogger {
 		table.put(2, 0, 1, 2);
 
 
-		final CopyOnWriteStateTableSnapshot<Integer, Integer, Integer> snapshot = table.createSnapshot();
+		final CopyOnWriteStateTableSnapshot<Integer, Integer, Integer> snapshot = table.stateSnapshot();
 
 		try {
-			final StateSnapshot.KeyGroupPartitionedSnapshot partitionedSnapshot = snapshot.partitionByKeyGroup();
+			final StateSnapshot.StateKeyGroupWriter partitionedSnapshot = snapshot.getKeyGroupWriter();
 			namespaceSerializer.disable();
 			keySerializer.disable();
 			stateSerializer.disable();
 
-			partitionedSnapshot.writeMappingsInKeyGroup(
+			partitionedSnapshot.writeStateInKeyGroup(
 				new DataOutputViewStreamWrapper(
 					new ByteArrayOutputStreamWithPos(1024)), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
index 45c86c7..41ef0c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
@@ -27,8 +27,9 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -46,8 +47,8 @@ public class StateTableSnapshotCompatibilityTest {
 	@Test
 	public void checkCompatibleSerializationFormats() throws IOException {
 		final Random r = new Random(42);
-		RegisteredKeyedBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo =
-			new RegisteredKeyedBackendStateMetaInfo<>(
+		RegisteredKeyValueStateBackendMetaInfo<Integer, ArrayList<Integer>> metaInfo =
+			new RegisteredKeyValueStateBackendMetaInfo<>(
 				StateDescriptor.Type.UNKNOWN,
 				"test",
 				IntSerializer.INSTANCE,
@@ -69,7 +70,7 @@ public class StateTableSnapshotCompatibilityTest {
 			cowStateTable.put(r.nextInt(10), r.nextInt(2), list);
 		}
 
-		StateSnapshot snapshot = cowStateTable.createSnapshot();
+		StateSnapshot snapshot = cowStateTable.stateSnapshot();
 
 		final NestedMapsStateTable<Integer, Integer, ArrayList<Integer>> nestedMapsStateTable =
 			new NestedMapsStateTable<>(keyContext, metaInfo);
@@ -83,7 +84,7 @@ public class StateTableSnapshotCompatibilityTest {
 			Assert.assertEquals(entry.getState(), nestedMapsStateTable.get(entry.getKey(), entry.getNamespace()));
 		}
 
-		snapshot = nestedMapsStateTable.createSnapshot();
+		snapshot = nestedMapsStateTable.stateSnapshot();
 		cowStateTable = new CopyOnWriteStateTable<>(keyContext, metaInfo);
 
 		restoreStateTableFromSnapshot(cowStateTable, snapshot, keyContext.getKeyGroupRange());
@@ -102,15 +103,15 @@ public class StateTableSnapshotCompatibilityTest {
 
 		final ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(1024 * 1024);
 		final DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out);
-		final StateSnapshot.KeyGroupPartitionedSnapshot keyGroupPartitionedSnapshot = snapshot.partitionByKeyGroup();
+		final StateSnapshot.StateKeyGroupWriter keyGroupPartitionedSnapshot = snapshot.getKeyGroupWriter();
 		for (Integer keyGroup : keyGroupRange) {
-			keyGroupPartitionedSnapshot.writeMappingsInKeyGroup(dov, keyGroup);
+			keyGroupPartitionedSnapshot.writeStateInKeyGroup(dov, keyGroup);
 		}
 
 		final ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(out.getBuf());
 		final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in);
 
-		final StateTableByKeyGroupReader keyGroupReader =
+		final StateSnapshotKeyGroupReader keyGroupReader =
 			StateTableByKeyGroupReaders.readerForVersion(stateTable, KeyedBackendSerializationProxy.VERSION);
 
 		for (Integer keyGroup : keyGroupRange) {

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
index 409c796..e196b1a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotEnumConstantsTest.java
@@ -32,11 +32,11 @@ public class StateMetaInfoSnapshotEnumConstantsTest {
 		Assert.assertEquals(0, StateMetaInfoSnapshot.BackendStateType.KEY_VALUE.ordinal());
 		Assert.assertEquals(1, StateMetaInfoSnapshot.BackendStateType.OPERATOR.ordinal());
 		Assert.assertEquals(2, StateMetaInfoSnapshot.BackendStateType.BROADCAST.ordinal());
-		Assert.assertEquals(3, StateMetaInfoSnapshot.BackendStateType.TIMER.ordinal());
+		Assert.assertEquals(3, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE.ordinal());
 		Assert.assertEquals("KEY_VALUE", StateMetaInfoSnapshot.BackendStateType.KEY_VALUE.toString());
 		Assert.assertEquals("OPERATOR", StateMetaInfoSnapshot.BackendStateType.OPERATOR.toString());
 		Assert.assertEquals("BROADCAST", StateMetaInfoSnapshot.BackendStateType.BROADCAST.toString());
-		Assert.assertEquals("TIMER", StateMetaInfoSnapshot.BackendStateType.TIMER.toString());
+		Assert.assertEquals("PRIORITY_QUEUE", StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE.toString());
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index 363ecf8..9e9328b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -36,8 +36,10 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedStateFactory;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.PriorityComparator;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SnapshotResult;
@@ -118,6 +120,11 @@ public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
+	public boolean requiresLegacySynchronousTimerSnapshots() {
+		return false;
+	}
+
+	@Override
 	public void notifyCheckpointComplete(long checkpointId) {
 		// noop
 	}
@@ -167,15 +174,13 @@ public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Nonnull
 	@Override
-	public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T>
+	public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
 	create(
 		@Nonnull String stateName,
-		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
-		@Nonnull PriorityComparator<T> elementPriorityComparator,
-		@Nonnull KeyExtractorFunction<T> keyExtractor) {
-		return new HeapPriorityQueueSet<>(
-			elementPriorityComparator,
-			keyExtractor,
+		@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+		return new HeapPriorityQueueSet<T>(
+			PriorityComparator.forPriorityComparableObjects(),
+			KeyExtractorFunction.forKeyedObjects(),
 			0,
 			keyGroupRange,
 			0);

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index ceae3e1..209d18f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -177,7 +177,7 @@ class RocksDBAggregatingState<K, N, T, ACC, R>
 	@SuppressWarnings("unchecked")
 	static <K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 		RocksDBKeyedStateBackend<K> backend) {
 		return (IS) new RocksDBAggregatingState<>(
 			registerResult.f0,

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index cf7974f..4d66357 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 
 import org.rocksdb.ColumnFamilyHandle;
@@ -103,7 +103,7 @@ class RocksDBFoldingState<K, N, T, ACC>
 	@SuppressWarnings("unchecked")
 	static <K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 		RocksDBKeyedStateBackend<K> backend) {
 		return (IS) new RocksDBFoldingState<>(
 			registerResult.f0,


[3/8] flink git commit: [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 622dd0c..0697463 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -63,14 +63,18 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.Keyed;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.PriorityComparator;
 import org.apache.flink.runtime.state.PriorityQueueSetFactory;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
 import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResult;
@@ -177,7 +181,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private interface StateFactory {
 		<K, N, SV, S extends State, IS extends S> IS createState(
 			StateDescriptor<S, SV> stateDesc,
-			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+			Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 			RocksDBKeyedStateBackend<K> backend) throws Exception;
 	}
 
@@ -224,7 +228,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * Information about the k/v states as we create them. This is used to retrieve the
 	 * column family that is used for a state and also for sanity checks when restoring.
 	 */
-	private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
+	private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> kvStateInformation;
 
 	/**
 	 * Map of state names to their corresponding restored state meta info.
@@ -256,7 +260,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
 
 	/** Factory for priority queue state. */
-	private PriorityQueueSetFactory priorityQueueFactory;
+	private final PriorityQueueSetFactory priorityQueueFactory;
+
+	private RocksDBWriteBatchWrapper writeBatchWrapper;
 
 	public RocksDBKeyedStateBackend(
 		String operatorIdentifier,
@@ -322,7 +328,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory();
 				break;
 			default:
-				break;
+				throw new IllegalArgumentException("Unknown priority queue state type: " + priorityQueueStateType);
 		}
 
 		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
@@ -344,12 +350,15 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	@SuppressWarnings("unchecked")
 	@Override
 	public <N> Stream<K> getKeys(String state, N namespace) {
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnInfo = kvStateInformation.get(state);
-		if (columnInfo == null) {
+		Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnInfo = kvStateInformation.get(state);
+		if (columnInfo == null || !(columnInfo.f1 instanceof RegisteredKeyValueStateBackendMetaInfo)) {
 			return Stream.empty();
 		}
 
-		final TypeSerializer<N> namespaceSerializer = (TypeSerializer<N>) columnInfo.f1.getNamespaceSerializer();
+		RegisteredKeyValueStateBackendMetaInfo<N, ?> registeredKeyValueStateBackendMetaInfo =
+			(RegisteredKeyValueStateBackendMetaInfo<N, ?>) columnInfo.f1;
+
+		final TypeSerializer<N> namespaceSerializer = registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer();
 		final ByteArrayOutputStreamWithPos namespaceOutputStream = new ByteArrayOutputStreamWithPos(8);
 		boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer);
 		final byte[] nameSpaceBytes;
@@ -396,6 +405,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		// working on the disposed object results in SEGFAULTS.
 		if (db != null) {
 
+			IOUtils.closeQuietly(writeBatchWrapper);
+
 			// RocksDB's native memory management requires that *all* CFs (including default) are closed before the
 			// DB is closed. See:
 			// https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
@@ -403,16 +414,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			IOUtils.closeQuietly(defaultColumnFamily);
 
 			// ... continue with the ones created by Flink...
-			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
+			for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> columnMetaData :
 				kvStateInformation.values()) {
 				IOUtils.closeQuietly(columnMetaData.f0);
 			}
 
-			// ... then close the priority queue related resources ...
-			if (priorityQueueFactory instanceof AutoCloseable) {
-				IOUtils.closeQuietly((AutoCloseable) priorityQueueFactory);
-			}
-
 			// ... and finally close the DB instance ...
 			IOUtils.closeQuietly(db);
 
@@ -431,13 +437,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Nonnull
 	@Override
-	public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+	public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
+	create(
 		@Nonnull String stateName,
-		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
-		@Nonnull PriorityComparator<T> elementComparator,
-		@Nonnull KeyExtractorFunction<T> keyExtractor) {
-
-		return priorityQueueFactory.create(stateName, byteOrderedElementSerializer, elementComparator, keyExtractor);
+		@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+		return priorityQueueFactory.create(stateName, byteOrderedElementSerializer);
 	}
 
 	private void cleanInstanceBasePath() {
@@ -482,6 +486,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Override
 	public void restore(Collection<KeyedStateHandle> restoreState) throws Exception {
+
 		LOG.info("Initializing RocksDB keyed state backend.");
 
 		if (LOG.isDebugEnabled()) {
@@ -534,6 +539,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private void createDB() throws IOException {
 		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
 		this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
+		this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions);
 		this.defaultColumnFamily = columnFamilyHandles.get(0);
 	}
 
@@ -679,7 +685,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
 
-				Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
+				Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredColumn =
 					rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
 
 				if (registeredColumn == null) {
@@ -689,8 +695,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						nameBytes,
 						rocksDBKeyedStateBackend.columnOptions);
 
-					RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-						new RegisteredKeyedBackendStateMetaInfo<>(restoredMetaInfo);
+					RegisteredStateMetaInfoBase stateMetaInfo =
+						RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
 
 					rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
 
@@ -987,12 +993,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			ColumnFamilyHandle columnFamilyHandle,
 			StateMetaInfoSnapshot stateMetaInfoSnapshot) throws RocksDBException {
 
-			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
+			Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredStateMetaInfoEntry =
 				stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
 
 			if (null == registeredStateMetaInfoEntry) {
-				RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-					new RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
+				RegisteredStateMetaInfoBase stateMetaInfo =
+					RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
 
 				registeredStateMetaInfoEntry =
 					new Tuple2<>(
@@ -1037,6 +1043,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 					stateBackend.db = restoreDBInfo.db;
 					stateBackend.defaultColumnFamily = restoreDBInfo.defaultColumnFamilyHandle;
+					stateBackend.writeBatchWrapper =
+						new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
 
 					for (int i = 0; i < restoreDBInfo.stateMetaInfoSnapshots.size(); ++i) {
 						getOrRegisterColumnFamilyHandle(
@@ -1061,6 +1069,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					Collections.emptyList(),
 					columnFamilyHandles);
 				stateBackend.defaultColumnFamily = columnFamilyHandles.get(0);
+				stateBackend.writeBatchWrapper =
+					new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
 			}
 		}
 
@@ -1115,13 +1125,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			// extract and store the default column family which is located at the first index
 			stateBackend.defaultColumnFamily = columnFamilyHandles.remove(0);
+			stateBackend.writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db, stateBackend.writeOptions);
 
 			for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
 				StateMetaInfoSnapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
 
 				ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
-				RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-					new RegisteredKeyedBackendStateMetaInfo<>(stateMetaInfoSnapshot);
+				RegisteredStateMetaInfoBase stateMetaInfo =
+					RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
 
 				stateBackend.kvStateInformation.put(
 					stateMetaInfoSnapshot.getName(),
@@ -1290,14 +1301,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * already have a registered entry for that and return it (after some necessary state compatibility checks)
 	 * or create a new one if it does not exist.
 	 */
-	private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tryRegisterKvStateInformation(
+	private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, S>> tryRegisterKvStateInformation(
 			StateDescriptor<?, S> stateDesc,
 			TypeSerializer<N> namespaceSerializer) throws StateMigrationException {
 
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
+		Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> stateInfo =
 			kvStateInformation.get(stateDesc.getName());
 
-		RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo;
+		RegisteredKeyValueStateBackendMetaInfo<N, S> newMetaInfo;
 		if (stateInfo != null) {
 
 			@SuppressWarnings("unchecked")
@@ -1308,7 +1319,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," +
 					" but its corresponding restored snapshot cannot be found.");
 
-			newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
+			newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
 				restoredMetaInfoSnapshot,
 				namespaceSerializer,
 				stateDesc);
@@ -1317,13 +1328,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		} else {
 			String stateName = stateDesc.getName();
 
-			newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+			newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
 				stateDesc.getType(),
 				stateName,
 				namespaceSerializer,
 				stateDesc.getSerializer());
 
-			ColumnFamilyHandle columnFamily = createColumnFamily(stateName, db);
+			ColumnFamilyHandle columnFamily = createColumnFamily(stateName);
 
 			stateInfo = Tuple2.of(columnFamily, newMetaInfo);
 			kvStateInformation.put(stateDesc.getName(), stateInfo);
@@ -1335,7 +1346,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/**
 	 * Creates a column family handle for use with a k/v state.
 	 */
-	private ColumnFamilyHandle createColumnFamily(String stateName, RocksDB db) {
+	private ColumnFamilyHandle createColumnFamily(String stateName) {
 		byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
 		Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
 			"The chosen state name 'default' collides with the name of the default column family!");
@@ -1359,7 +1370,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				stateDesc.getClass(), this.getClass());
 			throw new FlinkRuntimeException(message);
 		}
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult =
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult =
 			tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
 		return stateFactory.createState(stateDesc, registerResult, RocksDBKeyedStateBackend.this);
 	}
@@ -1382,7 +1393,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	public int numStateEntries() {
 		int count = 0;
 
-		for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) {
+		for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> column : kvStateInformation.values()) {
+			//TODO maybe filter only for k/v states
 			try (RocksIteratorWrapper rocksIterator = getRocksIterator(db, column.f0)) {
 				rocksIterator.seekToFirst();
 
@@ -1405,7 +1417,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	@VisibleForTesting
 	static final class RocksDBMergeIterator implements AutoCloseable {
 
-		private final PriorityQueue<MergeIterator> heap;
+		private final PriorityQueue<RocksDBKeyedStateBackend.MergeIterator> heap;
 		private final int keyGroupPrefixByteCount;
 		private boolean newKeyGroup;
 		private boolean newKVState;
@@ -1763,6 +1775,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
 
+			// flush everything into db before taking a snapshot
+			writeBatchWrapper.flush();
+
 			final RocksDBFullSnapshotOperation<K> snapshotOperation =
 				new RocksDBFullSnapshotOperation<>(
 					RocksDBKeyedStateBackend.this,
@@ -1982,7 +1997,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			this.copiedColumnFamilyHandles = new ArrayList<>(stateBackend.kvStateInformation.size());
 
-			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> tuple2 :
+			for (Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tuple2 :
 				stateBackend.kvStateInformation.values()) {
 				// snapshot meta info
 				this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
@@ -2433,7 +2448,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
 
 			// save meta data
-			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
+			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase>> stateMetaInfoEntry
 				: stateBackend.kvStateInformation.entrySet()) {
 				stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
 			}
@@ -2625,10 +2640,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/**
 	 * Encapsulates the logic and resources in connection with creating priority queue state structures.
 	 */
-	class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory, AutoCloseable {
+	class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 
 		/** Default cache size per key-group. */
-		private static final int DEFAULT_CACHES_SIZE = 8 * 1024;
+		private static final int DEFAULT_CACHES_SIZE = 1024;
 
 		/** A shared buffer to serialize elements for the priority queue. */
 		@Nonnull
@@ -2638,68 +2653,47 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		@Nonnull
 		private final DataOutputViewStreamWrapper elementSerializationOutView;
 
-		/** A shared {@link RocksDBWriteBatchWrapper} to batch modifications to priority queues. */
-		@Nonnull
-		private final RocksDBWriteBatchWrapper writeBatchWrapper;
-
-		/** Map to track all column families created to back priority queues. */
-		@Nonnull
-		private final Map<String, ColumnFamilyHandle> priorityQueueColumnFamilies;
-
-		/** The mandatory default column family, so that we can close it later. */
-		@Nonnull
-		private final ColumnFamilyHandle defaultColumnFamily;
-
-		/** Path of the RocksDB instance that holds the priority queues. */
-		@Nonnull
-		private final File pqInstanceRocksDBPath;
-
-		/** RocksDB instance that holds the priority queues. */
-		@Nonnull
-		private final RocksDB pqDb;
-
-		RocksDBPriorityQueueSetFactory() throws IOException {
-			this.pqInstanceRocksDBPath = new File(instanceBasePath, "pqdb");
-			if (pqInstanceRocksDBPath.exists()) {
-				try {
-					FileUtils.deleteDirectory(pqInstanceRocksDBPath);
-				} catch (IOException ex) {
-					LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex);
-				}
-			}
-			List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
-			this.pqDb = openDB(pqInstanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
+		RocksDBPriorityQueueSetFactory() {
 			this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
 			this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream);
-			this.writeBatchWrapper = new RocksDBWriteBatchWrapper(pqDb, writeOptions);
-			this.defaultColumnFamily = columnFamilyHandles.get(0);
-			this.priorityQueueColumnFamilies = new HashMap<>();
 		}
 
 		@Nonnull
 		@Override
-		public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+		public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
+		create(
 			@Nonnull String stateName,
-			@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
-			@Nonnull PriorityComparator<T> elementPriorityComparator,
-			@Nonnull KeyExtractorFunction<T> keyExtractor) {
+			@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+
+			final PriorityComparator<T> priorityComparator =
+				PriorityComparator.forPriorityComparableObjects();
+
+			Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> entry =
+				kvStateInformation.get(stateName);
 
-			final ColumnFamilyHandle columnFamilyHandle =
-				priorityQueueColumnFamilies.computeIfAbsent(
-					stateName,
-					(name) -> RocksDBKeyedStateBackend.this.createColumnFamily(name, pqDb));
+			if (entry == null) {
+				RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
+					new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer);
+
+				final ColumnFamilyHandle columnFamilyHandle = createColumnFamily(stateName);
+
+				entry = new Tuple2<>(columnFamilyHandle, metaInfo);
+				kvStateInformation.put(stateName, entry);
+			}
+
+			final ColumnFamilyHandle columnFamilyHandle = entry.f0;
 
 			@Nonnull
 			TieBreakingPriorityComparator<T> tieBreakingComparator =
 				new TieBreakingPriorityComparator<>(
-					elementPriorityComparator,
+					priorityComparator,
 					byteOrderedElementSerializer,
 					elementSerializationOutStream,
 					elementSerializationOutView);
 
 			return new KeyGroupPartitionedPriorityQueue<>(
-				keyExtractor,
-				elementPriorityComparator,
+				KeyExtractorFunction.forKeyedObjects(),
+				priorityComparator,
 				new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, CachingInternalPriorityQueueSet<T>>() {
 					@Nonnull
 					@Override
@@ -2714,7 +2708,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 							new RocksDBOrderedSetStore<>(
 								keyGroupId,
 								keyGroupPrefixBytes,
-								pqDb,
+								db,
 								columnFamilyHandle,
 								byteOrderedElementSerializer,
 								elementSerializationOutStream,
@@ -2727,20 +2721,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				keyGroupRange,
 				numberOfKeyGroups);
 		}
+	}
 
-		@Override
-		public void close() {
-			IOUtils.closeQuietly(writeBatchWrapper);
-			for (ColumnFamilyHandle columnFamilyHandle : priorityQueueColumnFamilies.values()) {
-				IOUtils.closeQuietly(columnFamilyHandle);
-			}
-			IOUtils.closeQuietly(defaultColumnFamily);
-			IOUtils.closeQuietly(pqDb);
-			try {
-				FileUtils.deleteDirectory(pqInstanceRocksDBPath);
-			} catch (IOException ex) {
-				LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex);
-			}
-		}
+	@Override
+	public boolean requiresLegacySynchronousTimerSnapshots() {
+		return priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 03faa44..aa5e93a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -257,7 +257,7 @@ class RocksDBListState<K, N, V>
 	@SuppressWarnings("unchecked")
 	static <E, K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 		RocksDBKeyedStateBackend<K> backend) {
 		return (IS) new RocksDBListState<>(
 			registerResult.f0,

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 9d00a67..4ec1f77 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -30,7 +30,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -616,7 +616,7 @@ class RocksDBMapState<K, N, UK, UV>
 	@SuppressWarnings("unchecked")
 	static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 		RocksDBKeyedStateBackend<K> backend) {
 		return (IS) new RocksDBMapState<>(
 			registerResult.f0,

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
index 5284314..4068c50 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
@@ -159,7 +159,6 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe
 	public RocksToJavaIteratorAdapter orderedIterator() {
 
 		flushWriteBatch();
-
 		return new RocksToJavaIteratorAdapter(
 			new RocksIteratorWrapper(
 				db.newIterator(columnFamilyHandle)));

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index d138045..490960e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -172,7 +172,7 @@ class RocksDBReducingState<K, N, V>
 	@SuppressWarnings("unchecked")
 	static <K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 		RocksDBKeyedStateBackend<K> backend) {
 		return (IS) new RocksDBReducingState<>(
 			registerResult.f0,

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 2b60fc1..5ae894e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -116,7 +116,7 @@ class RocksDBValueState<K, N, V>
 	@SuppressWarnings("unchecked")
 	static <K, N, SV, S extends State, IS extends S> IS create(
 		StateDescriptor<S, SV> stateDesc,
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult,
+		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
 		RocksDBKeyedStateBackend<K> backend) {
 		return (IS) new RocksDBValueState<>(
 			registerResult.f0,

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
index d54f122..05adf4f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapper.java
@@ -102,6 +102,10 @@ public class RocksDBWriteBatchWrapper implements AutoCloseable {
 		batch.clear();
 	}
 
+	public WriteOptions getOptions() {
+		return options;
+	}
+
 	@Override
 	public void close() throws RocksDBException {
 		if (batch.count() != 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 797a26a..52ba3e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -402,7 +402,11 @@ public abstract class AbstractStreamOperator<OUT>
 	 * @param context context that provides information and means required for taking a snapshot
 	 */
 	public void snapshotState(StateSnapshotContext context) throws Exception {
-		if (getKeyedStateBackend() != null) {
+		final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+		//TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
+		if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
+			((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
+
 			KeyedStateCheckpointOutputStream out;
 
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
deleted file mode 100644
index cc7fbc4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.KeyExtractorFunction;
-import org.apache.flink.runtime.state.KeyGroupPartitioner;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.StateSnapshot;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.lang.reflect.Array;
-
-/**
- * This class represents the snapshot of an {@link HeapPriorityQueueSet}.
- *
- * @param <T> type of the state elements.
- */
-public class HeapPriorityQueueStateSnapshot<T> implements StateSnapshot {
-
-	/** Function that extracts keys from elements. */
-	@Nonnull
-	private final KeyExtractorFunction<T> keyExtractor;
-
-	/** Copy of the heap array containing all the (immutable or deeply copied) elements. */
-	@Nonnull
-	private final T[] heapArrayCopy;
-
-	/** The element serializer. */
-	@Nonnull
-	private final TypeSerializer<T> elementSerializer;
-
-	/** The key-group range covered by this snapshot. */
-	@Nonnull
-	private final KeyGroupRange keyGroupRange;
-
-	/** The total number of key-groups in the job. */
-	@Nonnegative
-	private final int totalKeyGroups;
-
-	/** Result of partitioning the snapshot by key-group. */
-	@Nullable
-	private KeyGroupPartitionedSnapshot partitionedSnapshot;
-
-	HeapPriorityQueueStateSnapshot(
-		@Nonnull T[] heapArrayCopy,
-		@Nonnull KeyExtractorFunction<T> keyExtractor,
-		@Nonnull TypeSerializer<T> elementSerializer,
-		@Nonnull KeyGroupRange keyGroupRange,
-		@Nonnegative int totalKeyGroups) {
-
-		// TODO ensure that the array contains a deep copy of elements if we are *not* dealing with immutable types.
-		assert elementSerializer.isImmutableType();
-
-		this.keyExtractor = keyExtractor;
-		this.heapArrayCopy = heapArrayCopy;
-		this.elementSerializer = elementSerializer;
-		this.keyGroupRange = keyGroupRange;
-		this.totalKeyGroups = totalKeyGroups;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Nonnull
-	@Override
-	public KeyGroupPartitionedSnapshot partitionByKeyGroup() {
-
-		if (partitionedSnapshot == null) {
-
-			T[] partitioningOutput = (T[]) Array.newInstance(
-				heapArrayCopy.getClass().getComponentType(),
-				heapArrayCopy.length);
-
-			KeyGroupPartitioner<T> keyGroupPartitioner =
-				new KeyGroupPartitioner<>(
-					heapArrayCopy,
-					heapArrayCopy.length,
-					partitioningOutput,
-					keyGroupRange,
-					totalKeyGroups,
-					keyExtractor,
-					elementSerializer::serialize);
-
-			partitionedSnapshot = keyGroupPartitioner.partitionByKeyGroup();
-		}
-
-		return partitionedSnapshot;
-	}
-
-	@Override
-	public void release() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index ad1617e..29b89c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -46,6 +46,11 @@ import java.util.Map;
 @Internal
 public class InternalTimeServiceManager<K> {
 
+	//TODO guard these constants with a test
+	private static final String TIMER_STATE_PREFIX = "_timer_state";
+	private static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
+	private static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";
+
 	private final int totalKeyGroups;
 	private final KeyGroupRange localKeyGroupRange;
 	private final KeyContext keyContext;
@@ -55,12 +60,14 @@ public class InternalTimeServiceManager<K> {
 
 	private final Map<String, HeapInternalTimerService<K, ?>> timerServices;
 
+	private final boolean useLegacySynchronousSnapshots;
+
 	InternalTimeServiceManager(
-			int totalKeyGroups,
-			KeyGroupRange localKeyGroupRange,
-			KeyContext keyContext,
-			PriorityQueueSetFactory priorityQueueSetFactory,
-			ProcessingTimeService processingTimeService) {
+		int totalKeyGroups,
+		KeyGroupRange localKeyGroupRange,
+		KeyContext keyContext,
+		PriorityQueueSetFactory priorityQueueSetFactory,
+		ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {
 
 		Preconditions.checkArgument(totalKeyGroups > 0);
 		this.totalKeyGroups = totalKeyGroups;
@@ -68,6 +75,7 @@ public class InternalTimeServiceManager<K> {
 		this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);
 		this.keyContext = Preconditions.checkNotNull(keyContext);
 		this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
+		this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;
 
 		this.timerServices = new HashMap<>();
 	}
@@ -97,8 +105,8 @@ public class InternalTimeServiceManager<K> {
 				localKeyGroupRange,
 				keyContext,
 				processingTimeService,
-				createTimerPriorityQueue("__ts_" + name + "/processing_timers", timerSerializer),
-				createTimerPriorityQueue("__ts_" + name + "/event_timers", timerSerializer));
+				createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
+				createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
 
 			timerServices.put(name, timerService);
 		}
@@ -114,9 +122,7 @@ public class InternalTimeServiceManager<K> {
 		TimerSerializer<K, N> timerSerializer) {
 		return priorityQueueSetFactory.create(
 			name,
-			timerSerializer,
-			InternalTimer.getTimerComparator(),
-			InternalTimer.getKeyExtractorFunction());
+			timerSerializer);
 	}
 
 	public void advanceWatermark(Watermark watermark) throws Exception {
@@ -128,6 +134,7 @@ public class InternalTimeServiceManager<K> {
 	//////////////////				Fault Tolerance Methods				///////////////////
 
 	public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {
+		Preconditions.checkState(useLegacySynchronousSnapshots);
 		InternalTimerServiceSerializationProxy<K> serializationProxy =
 			new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);
 
@@ -148,6 +155,10 @@ public class InternalTimeServiceManager<K> {
 		serializationProxy.read(stream);
 	}
 
+	public boolean isUseLegacySynchronousSnapshots() {
+		return useLegacySynchronousSnapshots;
+	}
+
 	////////////////////			Methods used ONLY IN TESTS				////////////////////
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
index f88b4fb..324f3fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.PriorityComparable;
 import org.apache.flink.runtime.state.PriorityComparator;
 
 import javax.annotation.Nonnull;
@@ -31,7 +33,7 @@ import javax.annotation.Nonnull;
  * @param <N> Type of the namespace to which timers are scoped.
  */
 @Internal
-public interface InternalTimer<K, N> {
+public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {
 
 	/** Function to extract the key from a {@link InternalTimer}. */
 	KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;
@@ -48,6 +50,7 @@ public interface InternalTimer<K, N> {
 	 * Returns the key that is bound to this timer.
 	 */
 	@Nonnull
+	@Override
 	K getKey();
 
 	/**
@@ -55,14 +58,4 @@ public interface InternalTimer<K, N> {
 	 */
 	@Nonnull
 	N getNamespace();
-
-	@SuppressWarnings("unchecked")
-	static <T extends InternalTimer> PriorityComparator<T> getTimerComparator() {
-		return (PriorityComparator<T>) TIMER_COMPARATOR;
-	}
-
-	@SuppressWarnings("unchecked")
-	static <T extends InternalTimer> KeyExtractorFunction<T> getKeyExtractorFunction() {
-		return (KeyExtractorFunction<T>) KEY_EXTRACTOR_FUNCTION;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
index dbb74e5..4f762e2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
@@ -19,9 +19,11 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -29,6 +31,8 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -96,7 +100,7 @@ public class InternalTimersSnapshotReaderWriters {
 		public final void writeTimersSnapshot(DataOutputView out) throws IOException {
 			writeKeyAndNamespaceSerializers(out);
 
-			TimerHeapInternalTimer.TimerSerializer<K, N> timerSerializer = new TimerHeapInternalTimer.TimerSerializer<>(
+			LegacyTimerSerializer<K, N> timerSerializer = new LegacyTimerSerializer<>(
 				timersSnapshot.getKeySerializer(),
 				timersSnapshot.getNamespaceSerializer());
 
@@ -215,8 +219,8 @@ public class InternalTimersSnapshotReaderWriters {
 
 			restoreKeyAndNamespaceSerializers(restoredTimersSnapshot, in);
 
-			TimerHeapInternalTimer.TimerSerializer<K, N> timerSerializer =
-				new TimerHeapInternalTimer.TimerSerializer<>(
+			LegacyTimerSerializer<K, N> timerSerializer =
+				new LegacyTimerSerializer<>(
 					restoredTimersSnapshot.getKeySerializer(),
 					restoredTimersSnapshot.getNamespaceSerializer());
 
@@ -289,4 +293,120 @@ public class InternalTimersSnapshotReaderWriters {
 			restoredTimersSnapshot.setNamespaceSerializerConfigSnapshot(serializersAndConfigs.get(1).f1);
 		}
 	}
+
+	/**
+	 * A {@link TypeSerializer} used to serialize/deserialize a {@link TimerHeapInternalTimer}.
+	 */
+	public static class LegacyTimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer<K, N>> {
+
+		private static final long serialVersionUID = 1119562170939152304L;
+
+		@Nonnull
+		private final TypeSerializer<K> keySerializer;
+
+		@Nonnull
+		private final TypeSerializer<N> namespaceSerializer;
+
+		LegacyTimerSerializer(@Nonnull TypeSerializer<K> keySerializer, @Nonnull TypeSerializer<N> namespaceSerializer) {
+			this.keySerializer = keySerializer;
+			this.namespaceSerializer = namespaceSerializer;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<TimerHeapInternalTimer<K, N>> duplicate() {
+
+			final TypeSerializer<K> keySerializerDuplicate = keySerializer.duplicate();
+			final TypeSerializer<N> namespaceSerializerDuplicate = namespaceSerializer.duplicate();
+
+			if (keySerializerDuplicate == keySerializer &&
+				namespaceSerializerDuplicate == namespaceSerializer) {
+				// all delegate serializers seem stateless, so this is also stateless.
+				return this;
+			} else {
+				// at least one delegate serializer seems to be stateful, so we return a new instance.
+				return new LegacyTimerSerializer<>(keySerializerDuplicate, namespaceSerializerDuplicate);
+			}
+		}
+
+		@Override
+		public TimerHeapInternalTimer<K, N> createInstance() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from) {
+			return new TimerHeapInternalTimer<>(from.getTimestamp(), from.getKey(), from.getNamespace());
+		}
+
+		@Override
+		public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from, TimerHeapInternalTimer<K, N> reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			// we do not have fixed length
+			return -1;
+		}
+
+		@Override
+		public void serialize(TimerHeapInternalTimer<K, N> record, DataOutputView target) throws IOException {
+			keySerializer.serialize(record.getKey(), target);
+			namespaceSerializer.serialize(record.getNamespace(), target);
+			LongSerializer.INSTANCE.serialize(record.getTimestamp(), target);
+		}
+
+		@Override
+		public TimerHeapInternalTimer<K, N> deserialize(DataInputView source) throws IOException {
+			K key = keySerializer.deserialize(source);
+			N namespace = namespaceSerializer.deserialize(source);
+			Long timestamp = LongSerializer.INSTANCE.deserialize(source);
+			return new TimerHeapInternalTimer<>(timestamp, key, namespace);
+		}
+
+		@Override
+		public TimerHeapInternalTimer<K, N> deserialize(TimerHeapInternalTimer<K, N> reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			keySerializer.copy(source, target);
+			namespaceSerializer.copy(source, target);
+			LongSerializer.INSTANCE.copy(source, target);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj == this ||
+				(obj != null && obj.getClass() == getClass() &&
+					keySerializer.equals(((LegacyTimerSerializer) obj).keySerializer) &&
+					namespaceSerializer.equals(((LegacyTimerSerializer) obj).namespaceSerializer));
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return true;
+		}
+
+		@Override
+		public int hashCode() {
+			return getClass().hashCode();
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+		}
+
+		@Override
+		public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index ca9cb0b..a6bee4c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -209,7 +209,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
 			keyGroupRange,
 			keyContext,
 			keyedStatedBackend,
-			processingTimeService);
+			processingTimeService,
+			keyedStatedBackend.requiresLegacySynchronousTimerSnapshots());
 
 		// and then initialize the timer services
 		for (KeyGroupStatePartitionStreamProvider streamProvider : rawKeyedStates) {

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
index b9ef88e..a6194ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
@@ -19,19 +19,11 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
 
 import javax.annotation.Nonnull;
 
-import java.io.IOException;
-
 /**
  * Implementation of {@link InternalTimer} to use with a {@link HeapPriorityQueueSet}.
  *
@@ -133,119 +125,8 @@ public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>,
 				'}';
 	}
 
-	/**
-	 * A {@link TypeSerializer} used to serialize/deserialize a {@link TimerHeapInternalTimer}.
-	 */
-	public static class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer<K, N>> {
-
-		private static final long serialVersionUID = 1119562170939152304L;
-
-		@Nonnull
-		private final TypeSerializer<K> keySerializer;
-
-		@Nonnull
-		private final TypeSerializer<N> namespaceSerializer;
-
-		TimerSerializer(@Nonnull TypeSerializer<K> keySerializer, @Nonnull TypeSerializer<N> namespaceSerializer) {
-			this.keySerializer = keySerializer;
-			this.namespaceSerializer = namespaceSerializer;
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			return false;
-		}
-
-		@Override
-		public TypeSerializer<TimerHeapInternalTimer<K, N>> duplicate() {
-
-			final TypeSerializer<K> keySerializerDuplicate = keySerializer.duplicate();
-			final TypeSerializer<N> namespaceSerializerDuplicate = namespaceSerializer.duplicate();
-
-			if (keySerializerDuplicate == keySerializer &&
-				namespaceSerializerDuplicate == namespaceSerializer) {
-				// all delegate serializers seem stateless, so this is also stateless.
-				return this;
-			} else {
-				// at least one delegate serializer seems to be stateful, so we return a new instance.
-				return new TimerSerializer<>(keySerializerDuplicate, namespaceSerializerDuplicate);
-			}
-		}
-
-		@Override
-		public TimerHeapInternalTimer<K, N> createInstance() {
-			throw new UnsupportedOperationException();
-		}
-
-		@Override
-		public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from) {
-			return new TimerHeapInternalTimer<>(from.getTimestamp(), from.getKey(), from.getNamespace());
-		}
-
-		@Override
-		public TimerHeapInternalTimer<K, N> copy(TimerHeapInternalTimer<K, N> from, TimerHeapInternalTimer<K, N> reuse) {
-			return copy(from);
-		}
-
-		@Override
-		public int getLength() {
-			// we do not have fixed length
-			return -1;
-		}
-
-		@Override
-		public void serialize(TimerHeapInternalTimer<K, N> record, DataOutputView target) throws IOException {
-			keySerializer.serialize(record.getKey(), target);
-			namespaceSerializer.serialize(record.getNamespace(), target);
-			LongSerializer.INSTANCE.serialize(record.getTimestamp(), target);
-		}
-
-		@Override
-		public TimerHeapInternalTimer<K, N> deserialize(DataInputView source) throws IOException {
-			K key = keySerializer.deserialize(source);
-			N namespace = namespaceSerializer.deserialize(source);
-			Long timestamp = LongSerializer.INSTANCE.deserialize(source);
-			return new TimerHeapInternalTimer<>(timestamp, key, namespace);
-		}
-
-		@Override
-		public TimerHeapInternalTimer<K, N> deserialize(TimerHeapInternalTimer<K, N> reuse, DataInputView source) throws IOException {
-			return deserialize(source);
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			keySerializer.copy(source, target);
-			namespaceSerializer.copy(source, target);
-			LongSerializer.INSTANCE.copy(source, target);
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			return obj == this ||
-				(obj != null && obj.getClass() == getClass() &&
-					keySerializer.equals(((TimerSerializer) obj).keySerializer) &&
-					namespaceSerializer.equals(((TimerSerializer) obj).namespaceSerializer));
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return true;
-		}
-
-		@Override
-		public int hashCode() {
-			return getClass().hashCode();
-		}
-
-		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
-			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
-		}
-
-		@Override
-		public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-			throw new UnsupportedOperationException("This serializer is not registered for managed state.");
-		}
+	@Override
+	public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
+		return Long.compare(timestamp, other.getTimestamp());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
index 87a3159..73f42ef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -31,8 +32,8 @@ import java.io.IOException;
 import java.util.Objects;
 
 /**
- * A serializer for {@link TimerHeapInternalTimer} objects that produces a serialization format that is aligned with
- * {@link InternalTimer#getTimerComparator()}.
+ * A serializer for {@link TimerHeapInternalTimer} objects that produces a serialization format that is
+ * lexicographically aligned the priority of the timers.
  *
  * @param <K> type of the timer key.
  * @param <N> type of the timer namespace.
@@ -201,13 +202,14 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
 
 	@Override
 	public TypeSerializerConfigSnapshot snapshotConfiguration() {
-		throw new UnsupportedOperationException("This serializer is currently not used to write state.");
+		return new TimerSerializerConfigSnapshot<>(keySerializer, namespaceSerializer);
 	}
 
 	@Override
 	public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(
 		TypeSerializerConfigSnapshot configSnapshot) {
-		throw new UnsupportedOperationException("This serializer is currently not used to write state.");
+		//TODO this is just a mock (assuming no serializer updates) for now and needs a proper implementation! change this before release.
+		return CompatibilityResult.compatible();
 	}
 
 	@Nonnull
@@ -219,4 +221,25 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
 	public TypeSerializer<N> getNamespaceSerializer() {
 		return namespaceSerializer;
 	}
+
+	/**
+	 * Snaphot of a {@link TimerSerializer}.
+	 *
+	 * @param <K> type of key.
+	 * @param <N> type of namespace.
+	 */
+	public static class TimerSerializerConfigSnapshot<K, N> extends CompositeTypeSerializerConfigSnapshot {
+
+		public TimerSerializerConfigSnapshot() {
+		}
+
+		public TimerSerializerConfigSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) {
+			super(keySerializer, namespaceSerializer);
+		}
+
+		@Override
+		public int getVersion() {
+			return 0;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index 519f10e..957a535 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -910,8 +910,6 @@ public class HeapInternalTimerServiceTest {
 		PriorityQueueSetFactory priorityQueueSetFactory) {
 		return priorityQueueSetFactory.create(
 			name,
-			timerSerializer,
-			InternalTimer.getTimerComparator(),
-			InternalTimer.getKeyExtractorFunction());
+			timerSerializer);
 	}
 }