You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/16 21:06:34 UTC

[5/8] flink git commit: [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state

[FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state

Optimization for relaxed bulk polls

Deactivate optimization for now because it still contains a bug

This closes #6333.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbddf00b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbddf00b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbddf00b

Branch: refs/heads/master
Commit: dbddf00b75032c20df6e7aef26814da392347194
Parents: 0bbc91e
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Jun 13 11:56:16 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 16 22:11:57 2018 +0200

----------------------------------------------------------------------
 .../state/AbstractKeyedStateBackend.java        |   5 +
 .../state/BackendWritableBroadcastState.java    |   4 +-
 .../state/DefaultOperatorStateBackend.java      |  32 +--
 .../flink/runtime/state/HeapBroadcastState.java |  10 +-
 .../runtime/state/KeyExtractorFunction.java     |  13 ++
 .../runtime/state/KeyGroupPartitioner.java      |  72 +++++-
 .../org/apache/flink/runtime/state/Keyed.java   |  32 +++
 .../flink/runtime/state/PriorityComparable.java |  33 +++
 .../flink/runtime/state/PriorityComparator.java |   7 +
 .../runtime/state/PriorityQueueSetFactory.java  |   6 +-
 ...RegisteredBroadcastBackendStateMetaInfo.java | 155 -------------
 ...RegisteredBroadcastStateBackendMetaInfo.java | 169 ++++++++++++++
 .../RegisteredKeyValueStateBackendMetaInfo.java | 224 +++++++++++++++++++
 .../RegisteredKeyedBackendStateMetaInfo.java    | 212 ------------------
 .../RegisteredOperatorBackendStateMetaInfo.java | 142 ------------
 .../RegisteredOperatorStateBackendMetaInfo.java | 153 +++++++++++++
 ...steredPriorityQueueStateBackendMetaInfo.java |  87 +++++++
 .../state/RegisteredStateMetaInfoBase.java      |  17 ++
 .../flink/runtime/state/StateSnapshot.java      |  27 ++-
 .../state/StateSnapshotKeyGroupReader.java      |  44 ++++
 .../runtime/state/StateSnapshotRestore.java     |  47 ++++
 .../state/TieBreakingPriorityComparator.java    |   5 -
 .../heap/CachingInternalPriorityQueueSet.java   |  46 ++++
 .../state/heap/CopyOnWriteStateTable.java       |  13 +-
 .../heap/CopyOnWriteStateTableSnapshot.java     |  12 +-
 .../state/heap/HeapKeyedStateBackend.java       | 163 +++++++++-----
 .../runtime/state/heap/HeapPriorityQueue.java   |   3 +-
 .../state/heap/HeapPriorityQueueSetFactory.java |  18 +-
 ...HeapPriorityQueueSnapshotRestoreWrapper.java | 102 +++++++++
 .../heap/HeapPriorityQueueStateSnapshot.java    | 118 ++++++++++
 .../heap/KeyGroupPartitionedPriorityQueue.java  |  18 ++
 .../state/heap/NestedMapsStateTable.java        |  38 ++--
 .../flink/runtime/state/heap/StateTable.java    |  25 ++-
 .../state/heap/StateTableByKeyGroupReader.java  |  38 ----
 .../state/heap/StateTableByKeyGroupReaders.java |  89 +++-----
 .../state/metainfo/StateMetaInfoSnapshot.java   |   5 +-
 .../StateMetaInfoSnapshotReadersWriters.java    |  38 +++-
 .../state/KeyGroupPartitionerTestBase.java      |   4 +-
 .../runtime/state/SerializationProxiesTest.java |  54 ++---
 .../state/StateSnapshotCompressionTest.java     |   9 +-
 .../state/heap/CopyOnWriteStateTableTest.java   |  32 +--
 .../StateTableSnapshotCompatibilityTest.java    |  17 +-
 .../StateMetaInfoSnapshotEnumConstantsTest.java |   4 +-
 .../state/ttl/mock/MockKeyedStateBackend.java   |  19 +-
 .../state/RocksDBAggregatingState.java          |   4 +-
 .../streaming/state/RocksDBFoldingState.java    |   4 +-
 .../state/RocksDBKeyedStateBackend.java         | 188 +++++++---------
 .../streaming/state/RocksDBListState.java       |   4 +-
 .../streaming/state/RocksDBMapState.java        |   4 +-
 .../streaming/state/RocksDBOrderedSetStore.java |   1 -
 .../streaming/state/RocksDBReducingState.java   |   4 +-
 .../streaming/state/RocksDBValueState.java      |   4 +-
 .../state/RocksDBWriteBatchWrapper.java         |   4 +
 .../api/operators/AbstractStreamOperator.java   |   6 +-
 .../HeapPriorityQueueStateSnapshot.java         | 112 ----------
 .../operators/InternalTimeServiceManager.java   |  31 ++-
 .../streaming/api/operators/InternalTimer.java  |  15 +-
 .../InternalTimersSnapshotReaderWriters.java    | 126 ++++++++++-
 .../StreamTaskStateInitializerImpl.java         |   3 +-
 .../api/operators/TimerHeapInternalTimer.java   | 125 +----------
 .../api/operators/TimerSerializer.java          |  31 ++-
 .../operators/HeapInternalTimerServiceTest.java |   4 +-
 62 files changed, 1807 insertions(+), 1224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index c7f1bd9..17d24f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -315,4 +315,9 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	@VisibleForTesting
 	public abstract int numStateEntries();
 
+	// TODO remove this once heap-based timers are working with RocksDB incremental snapshots!
+	public boolean requiresLegacySynchronousTimerSnapshots() {
+		return false;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
index 8daf07c..ba3985b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
@@ -36,7 +36,7 @@ public interface BackendWritableBroadcastState<K, V> extends BroadcastState<K, V
 
 	long write(FSDataOutputStream out) throws IOException;
 
-	void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo);
+	void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo);
 
-	RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo();
+	RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index dc9b75f..f1d0b57 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -209,7 +209,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 		if (broadcastState == null) {
 			broadcastState = new HeapBroadcastState<>(
-					new RegisteredBroadcastBackendStateMetaInfo<>(
+					new RegisteredBroadcastStateBackendMetaInfo<>(
 							name,
 							OperatorStateHandle.Mode.BROADCAST,
 							broadcastStateKeySerializer,
@@ -227,7 +227,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			final StateMetaInfoSnapshot metaInfoSnapshot = restoredBroadcastStateMetaInfos.get(name);
 
 			@SuppressWarnings("unchecked")
-			RegisteredBroadcastBackendStateMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastBackendStateMetaInfo<K, V>(metaInfoSnapshot);
+			RegisteredBroadcastStateBackendMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastStateBackendMetaInfo<K, V>(metaInfoSnapshot);
 
 			// check compatibility to determine if state migration is required
 			CompatibilityResult<K> keyCompatibility = CompatibilityUtil.resolveCompatibilityResult(
@@ -247,7 +247,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			if (!keyCompatibility.isRequiresMigration() && !valueCompatibility.isRequiresMigration()) {
 				// new serializer is compatible; use it to replace the old serializer
 				broadcastState.setStateMetaInfo(
-						new RegisteredBroadcastBackendStateMetaInfo<>(
+						new RegisteredBroadcastStateBackendMetaInfo<>(
 								name,
 								OperatorStateHandle.Mode.BROADCAST,
 								broadcastStateKeySerializer,
@@ -510,8 +510,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 				// Recreate all PartitionableListStates from the meta info
 				for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
 
-					final RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
-						new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
+					final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
+						new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
 
 					if (restoredMetaInfo.getPartitionStateSerializer() == null ||
 						restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
@@ -546,8 +546,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 				for (StateMetaInfoSnapshot restoredSnapshot : restoredBroadcastMetaInfoSnapshots) {
 
-					final RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
-						new RegisteredBroadcastBackendStateMetaInfo<>(restoredSnapshot);
+					final RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
+						new RegisteredBroadcastStateBackendMetaInfo<>(restoredSnapshot);
 
 					if (restoredMetaInfo.getKeySerializer() == null || restoredMetaInfo.getValueSerializer() == null ||
 						restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer ||
@@ -613,7 +613,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		/**
 		 * Meta information of the state, including state name, assignment mode, and serializer
 		 */
-		private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
+		private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;
 
 		/**
 		 * The internal list the holds the elements of the state
@@ -625,12 +625,12 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		 */
 		private final ArrayListSerializer<S> internalListCopySerializer;
 
-		PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+		PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
 			this(stateMetaInfo, new ArrayList<S>());
 		}
 
 		private PartitionableListState(
-				RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo,
+				RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo,
 				ArrayList<S> internalList) {
 
 			this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
@@ -643,11 +643,11 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
 		}
 
-		public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
+		public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
 			this.stateMetaInfo = stateMetaInfo;
 		}
 
-		public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
+		public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() {
 			return stateMetaInfo;
 		}
 
@@ -741,7 +741,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			// no restored state for the state name; simply create new state holder
 
 			partitionableListState = new PartitionableListState<>(
-				new RegisteredOperatorBackendStateMetaInfo<>(
+				new RegisteredOperatorStateBackendMetaInfo<>(
 					name,
 					partitionStateSerializer,
 					mode));
@@ -757,8 +757,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 					mode);
 
 			StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name);
-			RegisteredOperatorBackendStateMetaInfo<S> metaInfo =
-				new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
+			RegisteredOperatorStateBackendMetaInfo<S> metaInfo =
+				new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
 
 			// check compatibility to determine if state migration is required
 			TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
@@ -772,7 +772,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			if (!stateCompatibility.isRequiresMigration()) {
 				// new serializer is compatible; use it to replace the old serializer
 				partitionableListState.setStateMetaInfo(
-					new RegisteredOperatorBackendStateMetaInfo<>(name, newPartitionStateSerializer, mode));
+					new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
 			} else {
 				// TODO state migration currently isn't possible.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
index 7ebf1ce..a262103 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HeapBroadcastState.java
@@ -42,7 +42,7 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
 	/**
 	 * Meta information of the state, including state name, assignment mode, and serializer.
 	 */
-	private RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo;
+	private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;
 
 	/**
 	 * The internal map the holds the elements of the state.
@@ -54,11 +54,11 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
 	 */
 	private final MapSerializer<K, V> internalMapCopySerializer;
 
-	HeapBroadcastState(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) {
+	HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
 		this(stateMetaInfo, new HashMap<>());
 	}
 
-	private HeapBroadcastState(final RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo, final Map<K, V> internalMap) {
+	private HeapBroadcastState(final RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo, final Map<K, V> internalMap) {
 
 		this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
 		this.backingMap = Preconditions.checkNotNull(internalMap);
@@ -70,12 +70,12 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
 	}
 
 	@Override
-	public void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) {
+	public void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
 		this.stateMetaInfo = stateMetaInfo;
 	}
 
 	@Override
-	public RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo() {
+	public RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo() {
 		return stateMetaInfo;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
index a3ce11c..79fafc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
@@ -28,9 +28,22 @@ import javax.annotation.Nonnull;
 @FunctionalInterface
 public interface KeyExtractorFunction<T> {
 
+	KeyExtractorFunction<? extends Keyed<?>> FOR_KEYED_OBJECTS = new KeyExtractorFunction<Keyed<?>>() {
+		@Nonnull
+		@Override
+		public Object extractKeyFromElement(@Nonnull Keyed<?> element) {
+			return element.getKey();
+		}
+	};
+
 	/**
 	 * Returns the key for the given element by which the key-group can be computed.
 	 */
 	@Nonnull
 	Object extractKeyFromElement(@Nonnull T element);
+
+	@SuppressWarnings("unchecked")
+	static <T extends Keyed<?>> KeyExtractorFunction<T> forKeyedObjects() {
+		return (KeyExtractorFunction<T>) FOR_KEYED_OBJECTS;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
index 6a9dfb5..27d411c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
@@ -28,7 +29,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
- * Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
+ * Class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
  * with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
  * single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
  * better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
@@ -89,7 +90,7 @@ public class KeyGroupPartitioner<T> {
 
 	/** Cached result. */
 	@Nullable
-	protected StateSnapshot.KeyGroupPartitionedSnapshot computedResult;
+	protected StateSnapshot.StateKeyGroupWriter computedResult;
 
 	/**
 	 * Creates a new {@link KeyGroupPartitioner}.
@@ -131,7 +132,7 @@ public class KeyGroupPartitioner<T> {
 	/**
 	 * Partitions the data into key-groups and returns the result via {@link PartitioningResult}.
 	 */
-	public StateSnapshot.KeyGroupPartitionedSnapshot partitionByKeyGroup() {
+	public StateSnapshot.StateKeyGroupWriter partitionByKeyGroup() {
 		if (computedResult == null) {
 			reportAllElementKeyGroups();
 			buildHistogramByAccumulatingCounts();
@@ -198,7 +199,7 @@ public class KeyGroupPartitioner<T> {
 	 * This represents the result of key-group partitioning. The data in {@link #partitionedElements} is partitioned
 	 * w.r.t. {@link KeyGroupPartitioner#keyGroupRange}.
 	 */
-	public static class PartitioningResult<T> implements StateSnapshot.KeyGroupPartitionedSnapshot {
+	private static class PartitioningResult<T> implements StateSnapshot.StateKeyGroupWriter {
 
 		/**
 		 * Function to write one element to a {@link DataOutputView}.
@@ -249,7 +250,7 @@ public class KeyGroupPartitioner<T> {
 		}
 
 		@Override
-		public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
+		public void writeStateInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
 
 			int startOffset = getKeyGroupStartOffsetInclusive(keyGroupId);
 			int endOffset = getKeyGroupEndOffsetExclusive(keyGroupId);
@@ -264,6 +265,43 @@ public class KeyGroupPartitioner<T> {
 		}
 	}
 
+	public static <T> StateSnapshotKeyGroupReader createKeyGroupPartitionReader(
+			@Nonnull ElementReaderFunction<T> readerFunction,
+			@Nonnull KeyGroupElementsConsumer<T> elementConsumer) {
+		return new PartitioningResultKeyGroupReader<>(readerFunction, elementConsumer);
+	}
+
+	/**
+	 * General algorithm to read key-grouped state that was written from a {@link PartitioningResult}
+	 *
+	 * @param <T> type of the elements to read.
+	 */
+	private static class PartitioningResultKeyGroupReader<T> implements StateSnapshotKeyGroupReader {
+
+		@Nonnull
+		private final ElementReaderFunction<T> readerFunction;
+
+		@Nonnull
+		private final KeyGroupElementsConsumer<T> elementConsumer;
+
+		public PartitioningResultKeyGroupReader(
+			@Nonnull ElementReaderFunction<T> readerFunction,
+			@Nonnull KeyGroupElementsConsumer<T> elementConsumer) {
+
+			this.readerFunction = readerFunction;
+			this.elementConsumer = elementConsumer;
+		}
+
+		@Override
+		public void readMappingsInKeyGroup(@Nonnull DataInputView in, @Nonnegative int keyGroupId) throws IOException {
+			int numElements = in.readInt();
+			for (int i = 0; i < numElements; i++) {
+				T element = readerFunction.readElement(in);
+				elementConsumer.consume(element, keyGroupId);
+			}
+		}
+	}
+
 	/**
 	 * This functional interface defines how one element is written to a {@link DataOutputView}.
 	 *
@@ -281,4 +319,28 @@ public class KeyGroupPartitioner<T> {
 		 */
 		void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException;
 	}
+
+	/**
+	 * This functional interface defines how one element is read from a {@link DataInputView}.
+	 *
+	 * @param <T> type of the read elements.
+	 */
+	@FunctionalInterface
+	public interface ElementReaderFunction<T> {
+
+		@Nonnull
+		T readElement(@Nonnull DataInputView div) throws IOException;
+	}
+
+	/**
+	 * Functional interface to consume elements from a key group.
+	 *
+	 * @param <T> type of the consumed elements.
+	 */
+	@FunctionalInterface
+	public interface KeyGroupElementsConsumer<T> {
+
+
+		void consume(@Nonnull T element, @Nonnegative int keyGroupId) throws IOException;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
new file mode 100644
index 0000000..4320b0b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Keyed.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/**
+ * Interface for objects that have a key attribute.
+ *
+ * @param <K> type of the key.
+ */
+public interface Keyed<K> {
+
+	/**
+	 * Returns the key attribute.
+	 */
+	K getKey();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
new file mode 100644
index 0000000..4d6cce0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Interface for objects that can be compared by priority.
+ * @param <T> type of the compared objects.
+ */
+public interface PriorityComparable<T> {
+
+	/**
+	 * @see PriorityComparator#comparePriority(Object, Object).
+	 */
+	int comparePriorityTo(@Nonnull T other);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
index 2f6f5a7..ec36924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
@@ -30,6 +30,8 @@ package org.apache.flink.runtime.state;
 @FunctionalInterface
 public interface PriorityComparator<T> {
 
+	PriorityComparator<? extends PriorityComparable<Object>> FOR_PRIORITY_COMPARABLE_OBJECTS = PriorityComparable::comparePriorityTo;
+
 	/**
 	 * Compares two objects for priority. Returns a negative integer, zero, or a positive integer as the first
 	 * argument has lower, equal to, or higher priority than the second.
@@ -39,4 +41,9 @@ public interface PriorityComparator<T> {
 	 * priority than the second.
 	 */
 	int comparePriority(T left, T right);
+
+	@SuppressWarnings("unchecked")
+	static <T extends PriorityComparable<?>> PriorityComparator<T> forPriorityComparableObjects() {
+		return (PriorityComparator<T>) FOR_PRIORITY_COMPARABLE_OBJECTS;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
index 6f509c0..2245e72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
@@ -38,9 +38,7 @@ public interface PriorityQueueSetFactory {
 	 * @return the queue with the specified unique name.
 	 */
 	@Nonnull
-	<T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+	<T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
 		@Nonnull String stateName,
-		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
-		@Nonnull PriorityComparator<T> elementPriorityComparator,
-		@Nonnull KeyExtractorFunction<T> keyExtractor);
+		@Nonnull TypeSerializer<T> byteOrderedElementSerializer);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
deleted file mode 100644
index 98a8195..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-public class RegisteredBroadcastBackendStateMetaInfo<K, V> extends RegisteredStateMetaInfoBase {
-
-	/** The mode how elements in this state are assigned to tasks during restore. */
-	private final OperatorStateHandle.Mode assignmentMode;
-
-	/** The type serializer for the keys in the map state. */
-	private final TypeSerializer<K> keySerializer;
-
-	/** The type serializer for the values in the map state. */
-	private final TypeSerializer<V> valueSerializer;
-
-	public RegisteredBroadcastBackendStateMetaInfo(
-			final String name,
-			final OperatorStateHandle.Mode assignmentMode,
-			final TypeSerializer<K> keySerializer,
-			final TypeSerializer<V> valueSerializer) {
-
-		super(name);
-		Preconditions.checkArgument(assignmentMode != null && assignmentMode == OperatorStateHandle.Mode.BROADCAST);
-		this.assignmentMode = assignmentMode;
-		this.keySerializer = Preconditions.checkNotNull(keySerializer);
-		this.valueSerializer = Preconditions.checkNotNull(valueSerializer);
-	}
-
-	public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> copy) {
-		this(
-			Preconditions.checkNotNull(copy).name,
-			copy.assignmentMode,
-			copy.keySerializer.duplicate(),
-			copy.valueSerializer.duplicate());
-	}
-
-	@SuppressWarnings("unchecked")
-	public RegisteredBroadcastBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
-		this(
-			snapshot.getName(),
-			OperatorStateHandle.Mode.valueOf(
-				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
-			(TypeSerializer<K>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
-			(TypeSerializer<V>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
-	}
-
-	/**
-	 * Creates a deep copy of the itself.
-	 */
-	public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() {
-		return new RegisteredBroadcastBackendStateMetaInfo<>(this);
-	}
-
-	@Nonnull
-	@Override
-	public StateMetaInfoSnapshot snapshot() {
-		Map<String, String> optionsMap = Collections.singletonMap(
-			StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
-			assignmentMode.toString());
-		Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
-		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
-		String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
-		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
-		serializerMap.put(keySerializerKey, keySerializer.duplicate());
-		serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration());
-		serializerMap.put(valueSerializerKey, valueSerializer.duplicate());
-		serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration());
-
-		return new StateMetaInfoSnapshot(
-			name,
-			StateMetaInfoSnapshot.BackendStateType.BROADCAST,
-			optionsMap,
-			serializerConfigSnapshotsMap,
-			serializerMap);
-	}
-
-	public TypeSerializer<K> getKeySerializer() {
-		return keySerializer;
-	}
-
-	public TypeSerializer<V> getValueSerializer() {
-		return valueSerializer;
-	}
-
-	public OperatorStateHandle.Mode getAssignmentMode() {
-		return assignmentMode;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == this) {
-			return true;
-		}
-
-		if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
-			return false;
-		}
-
-		final RegisteredBroadcastBackendStateMetaInfo other =
-				(RegisteredBroadcastBackendStateMetaInfo) obj;
-
-		return Objects.equals(name, other.getName())
-				&& Objects.equals(assignmentMode, other.getAssignmentMode())
-				&& Objects.equals(keySerializer, other.getKeySerializer())
-				&& Objects.equals(valueSerializer, other.getValueSerializer());
-	}
-
-	@Override
-	public int hashCode() {
-		int result = name.hashCode();
-		result = 31 * result + assignmentMode.hashCode();
-		result = 31 * result + keySerializer.hashCode();
-		result = 31 * result + valueSerializer.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "RegisteredBroadcastBackendStateMetaInfo{" +
-				"name='" + name + '\'' +
-				", keySerializer=" + keySerializer +
-				", valueSerializer=" + valueSerializer +
-				", assignmentMode=" + assignmentMode +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
new file mode 100644
index 0000000..02ab8ef
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredStateMetaInfoBase {
+
+	/** The mode how elements in this state are assigned to tasks during restore. */
+	@Nonnull
+	private final OperatorStateHandle.Mode assignmentMode;
+
+	/** The type serializer for the keys in the map state. */
+	@Nonnull
+	private final TypeSerializer<K> keySerializer;
+
+	/** The type serializer for the values in the map state. */
+	@Nonnull
+	private final TypeSerializer<V> valueSerializer;
+
+	public RegisteredBroadcastStateBackendMetaInfo(
+			@Nonnull final String name,
+			@Nonnull final OperatorStateHandle.Mode assignmentMode,
+			@Nonnull final TypeSerializer<K> keySerializer,
+			@Nonnull final TypeSerializer<V> valueSerializer) {
+
+		super(name);
+		Preconditions.checkArgument(assignmentMode == OperatorStateHandle.Mode.BROADCAST);
+		this.assignmentMode = assignmentMode;
+		this.keySerializer = keySerializer;
+		this.valueSerializer = valueSerializer;
+	}
+
+	public RegisteredBroadcastStateBackendMetaInfo(@Nonnull RegisteredBroadcastStateBackendMetaInfo<K, V> copy) {
+		this(
+			Preconditions.checkNotNull(copy).name,
+			copy.assignmentMode,
+			copy.keySerializer.duplicate(),
+			copy.valueSerializer.duplicate());
+	}
+
+	@SuppressWarnings("unchecked")
+	public RegisteredBroadcastStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+		this(
+			snapshot.getName(),
+			OperatorStateHandle.Mode.valueOf(
+				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
+			(TypeSerializer<K>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
+			(TypeSerializer<V>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
+	}
+
+	/**
+	 * Creates a deep copy of the itself.
+	 */
+	@Nonnull
+	public RegisteredBroadcastStateBackendMetaInfo<K, V> deepCopy() {
+		return new RegisteredBroadcastStateBackendMetaInfo<>(this);
+	}
+
+	@Nonnull
+	@Override
+	public StateMetaInfoSnapshot snapshot() {
+		return computeSnapshot();
+	}
+
+	@Nonnull
+	public TypeSerializer<K> getKeySerializer() {
+		return keySerializer;
+	}
+
+	@Nonnull
+	public TypeSerializer<V> getValueSerializer() {
+		return valueSerializer;
+	}
+
+	@Nonnull
+	public OperatorStateHandle.Mode getAssignmentMode() {
+		return assignmentMode;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+
+		if (!(obj instanceof RegisteredBroadcastStateBackendMetaInfo)) {
+			return false;
+		}
+
+		final RegisteredBroadcastStateBackendMetaInfo other =
+				(RegisteredBroadcastStateBackendMetaInfo) obj;
+
+		return Objects.equals(name, other.getName())
+				&& Objects.equals(assignmentMode, other.getAssignmentMode())
+				&& Objects.equals(keySerializer, other.getKeySerializer())
+				&& Objects.equals(valueSerializer, other.getValueSerializer());
+	}
+
+	@Override
+	public int hashCode() {
+		int result = name.hashCode();
+		result = 31 * result + assignmentMode.hashCode();
+		result = 31 * result + keySerializer.hashCode();
+		result = 31 * result + valueSerializer.hashCode();
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "RegisteredBroadcastBackendStateMetaInfo{" +
+				"name='" + name + '\'' +
+				", keySerializer=" + keySerializer +
+				", valueSerializer=" + valueSerializer +
+				", assignmentMode=" + assignmentMode +
+				'}';
+	}
+
+	@Nonnull
+	private StateMetaInfoSnapshot computeSnapshot() {
+		Map<String, String> optionsMap = Collections.singletonMap(
+			StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
+			assignmentMode.toString());
+		Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
+		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
+		String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
+		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+		serializerMap.put(keySerializerKey, keySerializer.duplicate());
+		serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration());
+		serializerMap.put(valueSerializerKey, valueSerializer.duplicate());
+		serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration());
+
+		return new StateMetaInfoSnapshot(
+			name,
+			StateMetaInfoSnapshot.BackendStateType.BROADCAST,
+			optionsMap,
+			serializerConfigSnapshotsMap,
+			serializerMap);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
new file mode 100644
index 0000000..d49a05c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StateMigrationException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
+ * state name.
+ *
+ * @param <N> Type of namespace
+ * @param <S> Type of state value
+ */
+public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStateMetaInfoBase {
+
+	@Nonnull
+	private final StateDescriptor.Type stateType;
+	@Nonnull
+	private final TypeSerializer<N> namespaceSerializer;
+	@Nonnull
+	private final TypeSerializer<S> stateSerializer;
+
+	public RegisteredKeyValueStateBackendMetaInfo(
+			@Nonnull StateDescriptor.Type stateType,
+			@Nonnull String name,
+			@Nonnull TypeSerializer<N> namespaceSerializer,
+			@Nonnull TypeSerializer<S> stateSerializer) {
+
+		super(name);
+		this.stateType = stateType;
+		this.namespaceSerializer = namespaceSerializer;
+		this.stateSerializer = stateSerializer;
+	}
+
+	@SuppressWarnings("unchecked")
+	public RegisteredKeyValueStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+		this(
+			StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
+			snapshot.getName(),
+			(TypeSerializer<N>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER)),
+			(TypeSerializer<S>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
+	}
+
+	@Nonnull
+	public StateDescriptor.Type getStateType() {
+		return stateType;
+	}
+
+	@Nonnull
+	public TypeSerializer<N> getNamespaceSerializer() {
+		return namespaceSerializer;
+	}
+
+	@Nonnull
+	public TypeSerializer<S> getStateSerializer() {
+		return stateSerializer;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		RegisteredKeyValueStateBackendMetaInfo<?, ?> that = (RegisteredKeyValueStateBackendMetaInfo<?, ?>) o;
+
+		if (!stateType.equals(that.stateType)) {
+			return false;
+		}
+
+		if (!getName().equals(that.getName())) {
+			return false;
+		}
+
+		return getStateSerializer().equals(that.getStateSerializer())
+				&& getNamespaceSerializer().equals(that.getNamespaceSerializer());
+	}
+
+	@Override
+	public String toString() {
+		return "RegisteredKeyedBackendStateMetaInfo{" +
+				"stateType=" + stateType +
+				", name='" + name + '\'' +
+				", namespaceSerializer=" + namespaceSerializer +
+				", stateSerializer=" + stateSerializer +
+				'}';
+	}
+
+	@Override
+	public int hashCode() {
+		int result = getName().hashCode();
+		result = 31 * result + getStateType().hashCode();
+		result = 31 * result + getNamespaceSerializer().hashCode();
+		result = 31 * result + getStateSerializer().hashCode();
+		return result;
+	}
+
+	/**
+	 * Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
+	 * This checks that the descriptor specifies identical names and state types, as well as
+	 * serializers that are compatible for the restored k/v state bytes.
+	 */
+	@Nonnull
+	public static <N, S> RegisteredKeyValueStateBackendMetaInfo<N, S> resolveKvStateCompatibility(
+		StateMetaInfoSnapshot restoredStateMetaInfoSnapshot,
+		TypeSerializer<N> newNamespaceSerializer,
+		StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {
+
+		Preconditions.checkState(
+			Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
+			"Incompatible state names. " +
+				"Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
+				"registered with [" + newStateDescriptor.getName() + "].");
+
+		final StateDescriptor.Type restoredType =
+			StateDescriptor.Type.valueOf(
+				restoredStateMetaInfoSnapshot.getOption(
+					StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+		if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
+			&& !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) {
+
+			Preconditions.checkState(
+				newStateDescriptor.getType() == restoredType,
+				"Incompatible state types. " +
+					"Was [" + restoredType + "], " +
+					"registered with [" + newStateDescriptor.getType() + "].");
+		}
+
+		// check compatibility results to determine if state migration is required
+		CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+			restoredStateMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
+			null,
+			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+				StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
+			newNamespaceSerializer);
+
+		TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
+		CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
+			restoredStateMetaInfoSnapshot.getTypeSerializer(
+				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
+			UnloadableDummyTypeSerializer.class,
+			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
+			newStateSerializer);
+
+		if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
+			// TODO state migration currently isn't possible.
+			throw new StateMigrationException("State migration isn't supported, yet.");
+		} else {
+			return new RegisteredKeyValueStateBackendMetaInfo<>(
+				newStateDescriptor.getType(),
+				newStateDescriptor.getName(),
+				newNamespaceSerializer,
+				newStateSerializer);
+		}
+	}
+
+	@Nonnull
+	@Override
+	public StateMetaInfoSnapshot snapshot() {
+		return computeSnapshot();
+	}
+
+	@Nonnull
+	private StateMetaInfoSnapshot computeSnapshot() {
+		Map<String, String> optionsMap = Collections.singletonMap(
+			StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
+			stateType.toString());
+		Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
+		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
+		String namespaceSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
+		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+		serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate());
+		serializerConfigSnapshotsMap.put(namespaceSerializerKey, namespaceSerializer.snapshotConfiguration());
+		serializerMap.put(valueSerializerKey, stateSerializer.duplicate());
+		serializerConfigSnapshotsMap.put(valueSerializerKey, stateSerializer.snapshotConfiguration());
+
+		return new StateMetaInfoSnapshot(
+			name,
+			StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+			optionsMap,
+			serializerConfigSnapshotsMap,
+			serializerMap);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
deleted file mode 100644
index e9b230a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StateMigrationException;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
- * state name.
- *
- * @param <N> Type of namespace
- * @param <S> Type of state value
- */
-public class RegisteredKeyedBackendStateMetaInfo<N, S> extends RegisteredStateMetaInfoBase {
-
-	private final StateDescriptor.Type stateType;
-	private final TypeSerializer<N> namespaceSerializer;
-	private final TypeSerializer<S> stateSerializer;
-
-	public RegisteredKeyedBackendStateMetaInfo(
-			StateDescriptor.Type stateType,
-			String name,
-			TypeSerializer<N> namespaceSerializer,
-			TypeSerializer<S> stateSerializer) {
-
-		super(name);
-		this.stateType = checkNotNull(stateType);
-		this.namespaceSerializer = checkNotNull(namespaceSerializer);
-		this.stateSerializer = checkNotNull(stateSerializer);
-	}
-
-	@SuppressWarnings("unchecked")
-	public RegisteredKeyedBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
-		this(
-			StateDescriptor.Type.valueOf(snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)),
-			snapshot.getName(),
-			(TypeSerializer<N>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-			(TypeSerializer<S>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
-	}
-
-	public StateDescriptor.Type getStateType() {
-		return stateType;
-	}
-
-	public TypeSerializer<N> getNamespaceSerializer() {
-		return namespaceSerializer;
-	}
-
-	public TypeSerializer<S> getStateSerializer() {
-		return stateSerializer;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		RegisteredKeyedBackendStateMetaInfo<?, ?> that = (RegisteredKeyedBackendStateMetaInfo<?, ?>) o;
-
-		if (!stateType.equals(that.stateType)) {
-			return false;
-		}
-
-		if (!getName().equals(that.getName())) {
-			return false;
-		}
-
-		return getStateSerializer().equals(that.getStateSerializer())
-				&& getNamespaceSerializer().equals(that.getNamespaceSerializer());
-	}
-
-	@Override
-	public String toString() {
-		return "RegisteredKeyedBackendStateMetaInfo{" +
-				"stateType=" + stateType +
-				", name='" + name + '\'' +
-				", namespaceSerializer=" + namespaceSerializer +
-				", stateSerializer=" + stateSerializer +
-				'}';
-	}
-
-	@Override
-	public int hashCode() {
-		int result = getName().hashCode();
-		result = 31 * result + getStateType().hashCode();
-		result = 31 * result + getNamespaceSerializer().hashCode();
-		result = 31 * result + getStateSerializer().hashCode();
-		return result;
-	}
-
-	/**
-	 * Checks compatibility of a restored k/v state, with the new {@link StateDescriptor} provided to it.
-	 * This checks that the descriptor specifies identical names and state types, as well as
-	 * serializers that are compatible for the restored k/v state bytes.
-	 */
-	public static <N, S> RegisteredKeyedBackendStateMetaInfo<N, S> resolveKvStateCompatibility(
-		StateMetaInfoSnapshot restoredStateMetaInfoSnapshot,
-		TypeSerializer<N> newNamespaceSerializer,
-		StateDescriptor<?, S> newStateDescriptor) throws StateMigrationException {
-
-		Preconditions.checkState(
-			Objects.equals(newStateDescriptor.getName(), restoredStateMetaInfoSnapshot.getName()),
-			"Incompatible state names. " +
-				"Was [" + restoredStateMetaInfoSnapshot.getName() + "], " +
-				"registered with [" + newStateDescriptor.getName() + "].");
-
-		final StateDescriptor.Type restoredType =
-			StateDescriptor.Type.valueOf(
-				restoredStateMetaInfoSnapshot.getOption(
-					StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
-
-		if (!Objects.equals(newStateDescriptor.getType(), StateDescriptor.Type.UNKNOWN)
-			&& !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) {
-
-			Preconditions.checkState(
-				newStateDescriptor.getType() == restoredType,
-				"Incompatible state types. " +
-					"Was [" + restoredType + "], " +
-					"registered with [" + newStateDescriptor.getType() + "].");
-		}
-
-		// check compatibility results to determine if state migration is required
-		CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-			restoredStateMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-			null,
-			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-				StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-			newNamespaceSerializer);
-
-		TypeSerializer<S> newStateSerializer = newStateDescriptor.getSerializer();
-		CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-			restoredStateMetaInfoSnapshot.getTypeSerializer(
-				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-			UnloadableDummyTypeSerializer.class,
-			restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-			newStateSerializer);
-
-		if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
-			// TODO state migration currently isn't possible.
-			throw new StateMigrationException("State migration isn't supported, yet.");
-		} else {
-			return new RegisteredKeyedBackendStateMetaInfo<>(
-				newStateDescriptor.getType(),
-				newStateDescriptor.getName(),
-				newNamespaceSerializer,
-				newStateSerializer);
-		}
-	}
-
-	@Nonnull
-	@Override
-	public StateMetaInfoSnapshot snapshot() {
-		Map<String, String> optionsMap = Collections.singletonMap(
-			StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE.toString(),
-			stateType.toString());
-		Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
-		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2);
-		String namespaceSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString();
-		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
-		serializerMap.put(namespaceSerializerKey, namespaceSerializer.duplicate());
-		serializerConfigSnapshotsMap.put(namespaceSerializerKey, namespaceSerializer.snapshotConfiguration());
-		serializerMap.put(valueSerializerKey, stateSerializer.duplicate());
-		serializerConfigSnapshotsMap.put(valueSerializerKey, stateSerializer.snapshotConfiguration());
-
-		return new StateMetaInfoSnapshot(
-			name,
-			StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
-			optionsMap,
-			serializerConfigSnapshotsMap,
-			serializerMap);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
deleted file mode 100644
index f314add..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nonnull;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Compound meta information for a registered state in an operator state backend.
- * This contains the state name, assignment mode, and state partition serializer.
- *
- * @param <S> Type of the state.
- */
-public class RegisteredOperatorBackendStateMetaInfo<S> extends RegisteredStateMetaInfoBase {
-
-	/**
-	 * The mode how elements in this state are assigned to tasks during restore
-	 */
-	private final OperatorStateHandle.Mode assignmentMode;
-
-	/**
-	 * The type serializer for the elements in the state list
-	 */
-	private final TypeSerializer<S> partitionStateSerializer;
-
-	public RegisteredOperatorBackendStateMetaInfo(
-			String name,
-			TypeSerializer<S> partitionStateSerializer,
-			OperatorStateHandle.Mode assignmentMode) {
-		super(Preconditions.checkNotNull(name));
-		this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
-		this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
-	}
-
-	private RegisteredOperatorBackendStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> copy) {
-		this(
-			Preconditions.checkNotNull(copy).name,
-			copy.partitionStateSerializer.duplicate(),
-			copy.assignmentMode);
-	}
-
-	@SuppressWarnings("unchecked")
-	public RegisteredOperatorBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
-		this(
-			snapshot.getName(),
-			(TypeSerializer<S>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-			OperatorStateHandle.Mode.valueOf(
-				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
-		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
-	}
-
-	/**
-	 * Creates a deep copy of the itself.
-	 */
-	public RegisteredOperatorBackendStateMetaInfo<S> deepCopy() {
-		return new RegisteredOperatorBackendStateMetaInfo<>(this);
-	}
-
-	@Nonnull
-	@Override
-	public StateMetaInfoSnapshot snapshot() {
-		Map<String, String> optionsMap = Collections.singletonMap(
-			StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
-			assignmentMode.toString());
-		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
-		Map<String, TypeSerializer<?>> serializerMap =
-			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
-		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
-			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
-
-		return new StateMetaInfoSnapshot(
-			name,
-			StateMetaInfoSnapshot.BackendStateType.OPERATOR,
-			optionsMap,
-			serializerConfigSnapshotsMap,
-			serializerMap);
-	}
-
-	public OperatorStateHandle.Mode getAssignmentMode() {
-		return assignmentMode;
-	}
-
-	public TypeSerializer<S> getPartitionStateSerializer() {
-		return partitionStateSerializer;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == this) {
-			return true;
-		}
-
-		if (obj == null) {
-			return false;
-		}
-
-		return (obj instanceof RegisteredOperatorBackendStateMetaInfo)
-			&& name.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getName())
-			&& assignmentMode.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getAssignmentMode())
-			&& partitionStateSerializer.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getPartitionStateSerializer());
-	}
-
-	@Override
-	public int hashCode() {
-		int result = getName().hashCode();
-		result = 31 * result + getAssignmentMode().hashCode();
-		result = 31 * result + getPartitionStateSerializer().hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "RegisteredOperatorBackendStateMetaInfo{" +
-			"name='" + name + "\'" +
-			", assignmentMode=" + assignmentMode +
-			", partitionStateSerializer=" + partitionStateSerializer +
-			'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
new file mode 100644
index 0000000..b65671e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Compound meta information for a registered state in an operator state backend.
+ * This contains the state name, assignment mode, and state partition serializer.
+ *
+ * @param <S> Type of the state.
+ */
+public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMetaInfoBase {
+
+	/**
+	 * The mode how elements in this state are assigned to tasks during restore
+	 */
+	@Nonnull
+	private final OperatorStateHandle.Mode assignmentMode;
+
+	/**
+	 * The type serializer for the elements in the state list
+	 */
+	@Nonnull
+	private final TypeSerializer<S> partitionStateSerializer;
+
+	public RegisteredOperatorStateBackendMetaInfo(
+			@Nonnull String name,
+			@Nonnull TypeSerializer<S> partitionStateSerializer,
+			@Nonnull OperatorStateHandle.Mode assignmentMode) {
+		super(name);
+		this.partitionStateSerializer = partitionStateSerializer;
+		this.assignmentMode = assignmentMode;
+	}
+
+	private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo<S> copy) {
+		this(
+			Preconditions.checkNotNull(copy).name,
+			copy.partitionStateSerializer.duplicate(),
+			copy.assignmentMode);
+	}
+
+	@SuppressWarnings("unchecked")
+	public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
+		this(
+			snapshot.getName(),
+			(TypeSerializer<S>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
+			OperatorStateHandle.Mode.valueOf(
+				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
+		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
+	}
+
+	/**
+	 * Creates a deep copy of the itself.
+	 */
+	@Nonnull
+	public RegisteredOperatorStateBackendMetaInfo<S> deepCopy() {
+		return new RegisteredOperatorStateBackendMetaInfo<>(this);
+	}
+
+	@Nonnull
+	@Override
+	public StateMetaInfoSnapshot snapshot() {
+		return computeSnapshot();
+	}
+
+	@Nonnull
+	public OperatorStateHandle.Mode getAssignmentMode() {
+		return assignmentMode;
+	}
+
+	@Nonnull
+	public TypeSerializer<S> getPartitionStateSerializer() {
+		return partitionStateSerializer;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+
+		if (obj == null) {
+			return false;
+		}
+
+		return (obj instanceof RegisteredOperatorStateBackendMetaInfo)
+			&& name.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getName())
+			&& assignmentMode.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getAssignmentMode())
+			&& partitionStateSerializer.equals(((RegisteredOperatorStateBackendMetaInfo) obj).getPartitionStateSerializer());
+	}
+
+	@Override
+	public int hashCode() {
+		int result = getName().hashCode();
+		result = 31 * result + getAssignmentMode().hashCode();
+		result = 31 * result + getPartitionStateSerializer().hashCode();
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "RegisteredOperatorBackendStateMetaInfo{" +
+			"name='" + name + "\'" +
+			", assignmentMode=" + assignmentMode +
+			", partitionStateSerializer=" + partitionStateSerializer +
+			'}';
+	}
+
+	@Nonnull
+	private StateMetaInfoSnapshot computeSnapshot() {
+		Map<String, String> optionsMap = Collections.singletonMap(
+			StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
+			assignmentMode.toString());
+		String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
+		Map<String, TypeSerializer<?>> serializerMap =
+			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
+		Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap =
+			Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
+
+		return new StateMetaInfoSnapshot(
+			name,
+			StateMetaInfoSnapshot.BackendStateType.OPERATOR,
+			optionsMap,
+			serializerConfigSnapshotsMap,
+			serializerMap);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
new file mode 100644
index 0000000..9ef23ed
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Meta information about a priority queue state in a backend.
+ */
+public class RegisteredPriorityQueueStateBackendMetaInfo<T> extends RegisteredStateMetaInfoBase {
+
+	@Nonnull
+	private final TypeSerializer<T> elementSerializer;
+
+	public RegisteredPriorityQueueStateBackendMetaInfo(
+		@Nonnull String name,
+		@Nonnull TypeSerializer<T> elementSerializer) {
+
+		super(name);
+		this.elementSerializer = elementSerializer;
+	}
+
+	@SuppressWarnings("unchecked")
+	public RegisteredPriorityQueueStateBackendMetaInfo(StateMetaInfoSnapshot snapshot) {
+		this(snapshot.getName(),
+			(TypeSerializer<T>) Preconditions.checkNotNull(
+				snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
+		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE == snapshot.getBackendStateType());
+	}
+
+	@Nonnull
+	@Override
+	public StateMetaInfoSnapshot snapshot() {
+		return computeSnapshot();
+	}
+
+	@Nonnull
+	public TypeSerializer<T> getElementSerializer() {
+		return elementSerializer;
+	}
+
+	private StateMetaInfoSnapshot computeSnapshot() {
+		Map<String, TypeSerializer<?>> serializerMap =
+			Collections.singletonMap(
+				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+				elementSerializer.duplicate());
+		Map<String, TypeSerializerConfigSnapshot> serializerSnapshotMap =
+			Collections.singletonMap(
+				StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(),
+				elementSerializer.snapshotConfiguration());
+
+		return new StateMetaInfoSnapshot(
+			name,
+			StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE,
+			Collections.emptyMap(),
+			serializerSnapshotMap,
+			serializerMap);
+	}
+
+	public RegisteredPriorityQueueStateBackendMetaInfo deepCopy() {
+		return new RegisteredPriorityQueueStateBackendMetaInfo<>(name, elementSerializer.duplicate());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
index 4132d14..b7dff59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
@@ -42,4 +42,21 @@ public abstract class RegisteredStateMetaInfoBase {
 
 	@Nonnull
 	public abstract StateMetaInfoSnapshot snapshot();
+
+	public static RegisteredStateMetaInfoBase fromMetaInfoSnapshot(@Nonnull StateMetaInfoSnapshot snapshot) {
+
+		final StateMetaInfoSnapshot.BackendStateType backendStateType = snapshot.getBackendStateType();
+		switch (backendStateType) {
+			case KEY_VALUE:
+				return new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+			case OPERATOR:
+				return new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
+			case BROADCAST:
+				return new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
+			case PRIORITY_QUEUE:
+				return new RegisteredPriorityQueueStateBackendMetaInfo<>(snapshot);
+			default:
+				throw new IllegalArgumentException("Unknown backend state type: " + backendStateType);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
index 1fcac5c..54180f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshot.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
@@ -30,38 +32,45 @@ import java.io.IOException;
  * All snapshots should be released after usage. This interface outlines the asynchronous snapshot life-cycle, which
  * typically looks as follows. In the synchronous part of a checkpoint, an instance of {@link StateSnapshot} is produced
  * for a state and captures the state at this point in time. Then, in the asynchronous part of the checkpoint, the user
- * calls {@link #partitionByKeyGroup()} to ensure that the snapshot is partitioned into key-groups. For state that is
- * already partitioned, this can be a NOP. The returned {@link KeyGroupPartitionedSnapshot} can be used by the caller
+ * calls {@link #getKeyGroupWriter()} to ensure that the snapshot is partitioned into key-groups. For state that is
+ * already partitioned, this can be a NOP. The returned {@link StateKeyGroupWriter} can be used by the caller
  * to write the state by key-group. As a last step, when the state is completely written, the user calls
  * {@link #release()}.
  */
+@Internal
 public interface StateSnapshot {
 
 	/**
-	 * This method partitions the snapshot by key-group and then returns a {@link KeyGroupPartitionedSnapshot}.
+	 * This method returns {@link StateKeyGroupWriter} and should be called in the asynchronous part of the snapshot.
 	 */
 	@Nonnull
-	KeyGroupPartitionedSnapshot partitionByKeyGroup();
+	StateKeyGroupWriter getKeyGroupWriter();
+
+	/**
+	 * Returns a snapshot of the state's meta data.
+	 */
+	@Nonnull
+	StateMetaInfoSnapshot getMetaInfoSnapshot();
 
 	/**
 	 * Release the snapshot. All snapshots should be released when they are no longer used because some implementation
-	 * can only release resources after a release. Produced {@link KeyGroupPartitionedSnapshot} should no longer be used
+	 * can only release resources after a release. Produced {@link StateKeyGroupWriter} should no longer be used
 	 * after calling this method.
 	 */
 	void release();
 
 	/**
-	 * Interface for writing a snapshot after it is partitioned into key-groups.
+	 * Interface for writing a snapshot that is partitioned into key-groups.
 	 */
-	interface KeyGroupPartitionedSnapshot {
+	interface StateKeyGroupWriter {
 		/**
-		 * Writes the data for the specified key-group to the output. You must call {@link #partitionByKeyGroup()} once
+		 * Writes the data for the specified key-group to the output. You must call {@link #getKeyGroupWriter()} once
 		 * before first calling this method.
 		 *
 		 * @param dov        the output.
 		 * @param keyGroupId the key-group to write.
 		 * @throws IOException on write-related problems.
 		 */
-		void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, @Nonnegative int keyGroupId) throws IOException;
+		void writeStateInKeyGroup(@Nonnull DataOutputView dov, @Nonnegative int keyGroupId) throws IOException;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
new file mode 100644
index 0000000..b6b91ee
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotKeyGroupReader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.StateTable;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+
+/**
+ * Interface for state de-serialization into {@link StateTable}s by key-group.
+ */
+@Internal
+public interface StateSnapshotKeyGroupReader {
+
+	/**
+	 * Read the data for the specified key-group from the input.
+	 *
+	 * @param div        the input
+	 * @param keyGroupId the key-group to write
+	 * @throws IOException on write related problems
+	 */
+	void readMappingsInKeyGroup(@Nonnull DataInputView div, @Nonnegative int keyGroupId) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
new file mode 100644
index 0000000..6fe50ca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Interface to deal with state snapshot and restore of state.
+ * TODO find better name?
+ */
+@Internal
+public interface StateSnapshotRestore {
+
+	/**
+	 * Returns a snapshot of the state.
+	 */
+	@Nonnull
+	StateSnapshot stateSnapshot();
+
+	/**
+	 * This method returns a {@link StateSnapshotKeyGroupReader} that can be used to restore the state on a
+	 * per-key-group basis. This method tries to return a reader for the given version hint.
+	 *
+	 * @param readVersionHint the required version of the state to read.
+	 * @return a reader that reads state by key-groups, for the given read version.
+	 */
+	@Nonnull
+	StateSnapshotKeyGroupReader keyGroupReader(int readVersionHint);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
index 4384eb7..94aed45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
@@ -85,11 +85,6 @@ public class TieBreakingPriorityComparator<T> implements Comparator<T>, Priority
 			return ((Comparable<T>) o1).compareTo(o2);
 		}
 
-		// we catch this case before moving to more expensive tie breaks.
-		if (o1.equals(o2)) {
-			return 0;
-		}
-
 		// if objects are not equal, their serialized form should somehow differ as well. this can be costly, and...
 		// TODO we should have an alternative approach in the future, e.g. a cache that does not rely on compare to check equality.
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/dbddf00b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
index 6dc8cf3..b1ad0df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
@@ -30,6 +30,8 @@ import java.util.Collection;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
+import static org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION;
+
 /**
  * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
  * two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth
@@ -80,6 +82,30 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
 
 	@Override
 	public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+		if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
+			bulkPollRelaxedOrder(canConsume, consumer);
+		} else {
+			bulkPollStrictOrder(canConsume, consumer);
+		}
+	}
+
+	private void bulkPollRelaxedOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+		if (orderedCache.isEmpty()) {
+			bulkPollStore(canConsume, consumer);
+		} else {
+			while (!orderedCache.isEmpty() && canConsume.test(orderedCache.peekFirst())) {
+				final E next = orderedCache.removeFirst();
+				orderedStore.remove(next);
+				consumer.accept(next);
+			}
+
+			if (orderedCache.isEmpty()) {
+				bulkPollStore(canConsume, consumer);
+			}
+		}
+	}
+
+	private void bulkPollStrictOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
 		E element;
 		while ((element = peek()) != null && canConsume.test(element)) {
 			poll();
@@ -87,6 +113,26 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
 		}
 	}
 
+	private void bulkPollStore(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+		try (CloseableIterator<E> iterator = orderedStore.orderedIterator()) {
+			while (iterator.hasNext()) {
+				final E next = iterator.next();
+				if (canConsume.test(next)) {
+					orderedStore.remove(next);
+					consumer.accept(next);
+				} else {
+					orderedCache.add(next);
+					while (iterator.hasNext() && !orderedCache.isFull()) {
+						orderedCache.add(iterator.next());
+					}
+					break;
+				}
+			}
+		} catch (Exception e) {
+			throw new FlinkRuntimeException("Exception while bulk polling store.", e);
+		}
+	}
+
 	@Nullable
 	@Override
 	public E poll() {