You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/02 12:36:31 UTC

[GitHub] asfgit closed pull request #6438: [FLINK-9981] Tune performance of RocksDB implementation

asfgit closed pull request #6438: [FLINK-9981] Tune performance of RocksDB implementation
URL: https://github.com/apache/flink/pull/6438
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rocks_db_configuration.html b/docs/_includes/generated/rocks_db_configuration.html
index 57b95114976..8983f8b41dd 100644
--- a/docs/_includes/generated/rocks_db_configuration.html
+++ b/docs/_includes/generated/rocks_db_configuration.html
@@ -13,9 +13,9 @@
             <td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
         </tr>
         <tr>
-            <td><h5>state.backend.rocksdb.timer-service.impl</h5></td>
+            <td><h5>state.backend.rocksdb.timer-service.factory</h5></td>
             <td style="word-wrap: break-word;">"HEAP"</td>
-            <td>This determines the timer service implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB.</td>
+            <td>This determines the factory for timer service state implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB .</td>
         </tr>
     </tbody>
 </table>
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestOrderedStore.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
similarity index 50%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestOrderedStore.java
rename to flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
index 36a334122c4..33836f0c781 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestOrderedStore.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
@@ -16,45 +16,41 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.util.CloseableIterator;
+package org.apache.flink.core.memory;
 
 import javax.annotation.Nonnull;
 
-import java.util.Comparator;
-import java.util.TreeSet;
-
 /**
- * Simple implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
- * for tests.
+ * Reusable adapter to {@link DataInputView} that operates on given byte-arrays.
  */
-public class TestOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
+public class ByteArrayDataInputView extends DataInputViewStreamWrapper {
+
+	@Nonnull
+	private final ByteArrayInputStreamWithPos inStreamWithPos;
 
-	private final TreeSet<T> treeSet;
+	public ByteArrayDataInputView() {
+		super(new ByteArrayInputStreamWithPos());
+		this.inStreamWithPos = (ByteArrayInputStreamWithPos) in;
+	}
 
-	public TestOrderedStore(Comparator<T> comparator) {
-		this.treeSet = new TreeSet<>(comparator);
+	public ByteArrayDataInputView(@Nonnull byte[] buffer) {
+		this(buffer, 0, buffer.length);
 	}
 
-	@Override
-	public void add(@Nonnull T element) {
-		treeSet.add(element);
+	public ByteArrayDataInputView(@Nonnull byte[] buffer, int offset, int length) {
+		this();
+		setData(buffer, offset, length);
 	}
 
-	@Override
-	public void remove(@Nonnull T element) {
-		treeSet.remove(element);
+	public int getPosition() {
+		return inStreamWithPos.getPosition();
 	}
 
-	@Override
-	public int size() {
-		return treeSet.size();
+	public void setPosition(int pos) {
+		inStreamWithPos.setPosition(pos);
 	}
 
-	@Nonnull
-	@Override
-	public CloseableIterator<T> orderedIterator() {
-		return CloseableIterator.adapterForIterator(treeSet.iterator());
+	public void setData(@Nonnull byte[] buffer, int offset, int length) {
+		inStreamWithPos.setBuffer(buffer, offset, length);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataOutputView.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataOutputView.java
new file mode 100644
index 00000000000..a96f3d3fef1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataOutputView.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Adapter to {@link DataOutputView} that operates on a byte-array and offers read/write access to the current position.
+ */
+public class ByteArrayDataOutputView extends DataOutputViewStreamWrapper {
+
+	@Nonnull
+	private final ByteArrayOutputStreamWithPos outputStreamWithPos;
+
+	public ByteArrayDataOutputView() {
+		this(64);
+	}
+
+	public ByteArrayDataOutputView(int initialSize) {
+		super(new ByteArrayOutputStreamWithPos(initialSize));
+		this.outputStreamWithPos = (ByteArrayOutputStreamWithPos) out;
+	}
+
+	public void reset() {
+		outputStreamWithPos.reset();
+	}
+
+	@Nonnull
+	public byte[] toByteArray() {
+		return outputStreamWithPos.toByteArray();
+	}
+
+	public int getPosition() {
+		return outputStreamWithPos.getPosition();
+	}
+
+	public void setPosition(int position) {
+		outputStreamWithPos.setPosition(position);
+	}
+
+	@Nonnull
+	public byte[] getInternalBufferReference() {
+		return outputStreamWithPos.getBuf();
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
index 1447e9661c8..bc81593dea4 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
@@ -30,20 +30,23 @@
 @Internal
 public class ByteArrayInputStreamWithPos extends InputStream {
 
+	private static final byte[] EMPTY = new byte[0];
+
 	protected byte[] buffer;
 	protected int position;
 	protected int count;
 	protected int mark = 0;
 
+	public ByteArrayInputStreamWithPos() {
+		this(EMPTY);
+	}
+
 	public ByteArrayInputStreamWithPos(byte[] buffer) {
 		this(buffer, 0, buffer.length);
 	}
 
 	public ByteArrayInputStreamWithPos(byte[] buffer, int offset, int length) {
-		this.position = offset;
-		this.buffer = buffer;
-		this.mark = offset;
-		this.count = Math.min(buffer.length, offset + length);
+		setBuffer(buffer, offset, length);
 	}
 
 	@Override
@@ -122,4 +125,11 @@ public void setPosition(int pos) {
 		Preconditions.checkArgument(pos >= 0 && pos <= count, "Position out of bounds.");
 		this.position = pos;
 	}
+
+	public void setBuffer(byte[] buffer, int offset, int length) {
+		this.count = Math.min(buffer.length, offset + length);
+		setPosition(offset);
+		this.buffer = buffer;
+		this.mark = offset;
+	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java
index 4797c0a338c..62ad911ab25 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java
@@ -81,4 +81,18 @@ public void testSetNegativePosition() throws Exception {
 		thrown.expectMessage("Position out of bounds.");
 		stream.setPosition(-1);
 	}
+
+	@Test
+	public void testSetBuffer() {
+		ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos();
+		Assert.assertEquals(-1, in.read());
+		byte[] testData = new byte[]{0x42, 0x43, 0x44, 0x45};
+		int off = 1;
+		int len = 2;
+		in.setBuffer(testData, off, len);
+		for (int i = 0; i < len; ++i) {
+			Assert.assertEquals(testData[i + off], in.read());
+		}
+		Assert.assertEquals(-1, in.read());
+	}
 }
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index 26a7e855c3c..fd0bf913a85 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -59,7 +59,7 @@ backup_config
 change_conf "taskmanager.numberOfTaskSlots" "1" "${NUM_SLOTS}"
 
 if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'rocks' ]; then
-  set_conf "state.backend.rocksdb.timer-service.impl" "rocksdb"
+  set_conf "state.backend.rocksdb.timer-service.factory" "rocksdb"
 fi
 
 setup_flink_slf4j_metric_reporter
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index ab4521cf51f..733ddf6f23a 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -115,12 +115,6 @@ under the License.
 			<!-- Version is set in root POM -->
 		</dependency>
 
-		<dependency>
-			<groupId>it.unimi.dsi</groupId>
-			<artifactId>fastutil</artifactId>
-			<version>8.2.1</version>
-		</dependency>
-
 		<dependency>
 			<groupId>org.scala-lang</groupId>
 			<artifactId>scala-library</artifactId>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
index dc46c8ab637..fb3ee82f984 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
@@ -26,8 +26,6 @@
 import javax.annotation.Nullable;
 
 import java.util.Collection;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
 
 /**
  * Interface for collection that gives in order access to elements w.r.t their priority.
@@ -37,16 +35,6 @@
 @Internal
 public interface InternalPriorityQueue<T> {
 
-	/**
-	 * Polls from the top of the queue as long as the the queue is not empty and passes the elements to
-	 * {@link Consumer} until a {@link Predicate} rejects an offered element. The rejected element is not
-	 * removed from the queue and becomes the new head.
-	 *
-	 * @param canConsume bulk polling ends once this returns false. The rejected element is nor removed and not consumed.
-	 * @param consumer consumer function for elements accepted by canConsume.
-	 */
-	void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer);
-
 	/**
 	 * Retrieves and removes the first element (w.r.t. the order) of this set,
 	 * or returns {@code null} if this set is empty.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
deleted file mode 100644
index 94aed45a03e..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import javax.annotation.Nonnull;
-
-import java.io.IOException;
-import java.util.Comparator;
-
-/**
- * This class is an adapter between {@link PriorityComparator} and a full {@link Comparator} that respects the
- * contract between {@link Comparator#compare(Object, Object)} and {@link Object#equals(Object)}. This is currently
- * needed for implementations of
- * {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetCache} that are implemented
- * on top of a data structure that relies on the this contract, e.g. a tree set. We should replace this in the near
- * future.
- *
- * @param <T> type of the compared elements.
- */
-public class TieBreakingPriorityComparator<T> implements Comparator<T>, PriorityComparator<T> {
-
-	/** The {@link PriorityComparator} to which we delegate in a first step. */
-	@Nonnull
-	private final PriorityComparator<T> priorityComparator;
-
-	/** Serializer for instances of the compared objects. */
-	@Nonnull
-	private final TypeSerializer<T> serializer;
-
-	/** Stream that we use in serialization. */
-	@Nonnull
-	private final ByteArrayOutputStreamWithPos outStream;
-
-	/** {@link org.apache.flink.core.memory.DataOutputView} around outStream. */
-	@Nonnull
-	private final DataOutputViewStreamWrapper outView;
-
-	public TieBreakingPriorityComparator(
-		@Nonnull PriorityComparator<T> priorityComparator,
-		@Nonnull TypeSerializer<T> serializer,
-		@Nonnull ByteArrayOutputStreamWithPos outStream,
-		@Nonnull DataOutputViewStreamWrapper outView) {
-
-		this.priorityComparator = priorityComparator;
-		this.serializer = serializer;
-		this.outStream = outStream;
-		this.outView = outView;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public int compare(T o1, T o2) {
-
-		// first we compare priority, this should be the most commonly hit case
-		int cmp = priorityComparator.comparePriority(o1, o2);
-
-		if (cmp != 0) {
-			return cmp;
-		}
-
-		// here we start tie breaking and do our best to comply with the compareTo/equals contract, first we try
-		// to simply find an existing way to fully compare.
-		if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
-			return ((Comparable<T>) o1).compareTo(o2);
-		}
-
-		// if objects are not equal, their serialized form should somehow differ as well. this can be costly, and...
-		// TODO we should have an alternative approach in the future, e.g. a cache that does not rely on compare to check equality.
-		try {
-			outStream.reset();
-			serializer.serialize(o1, outView);
-			int leftLen = outStream.getPosition();
-			serializer.serialize(o2, outView);
-			int rightLen = outStream.getPosition() - leftLen;
-			return compareBytes(outStream.getBuf(), 0, leftLen, leftLen, rightLen);
-		} catch (IOException ex) {
-			throw new FlinkRuntimeException("Serializer problem in comparator.", ex);
-		}
-	}
-
-	@Override
-	public int comparePriority(T left, T right) {
-		return priorityComparator.comparePriority(left, right);
-	}
-
-	public static int compareBytes(byte[] bytes, int offLeft, int leftLen, int offRight, int rightLen) {
-		int maxLen = Math.min(leftLen, rightLen);
-		for (int i = 0; i < maxLen; ++i) {
-			int cmp = bytes[offLeft + i] - bytes[offRight + i];
-			if (cmp != 0) {
-				return cmp;
-			}
-		}
-		return leftLen - rightLen;
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueue.java
new file mode 100644
index 00000000000..f2981bd0b8e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapPriorityQueue.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Abstract base class for heap (object array) based implementations of priority queues, with support for fast deletes
+ * via {@link HeapPriorityQueueElement}.
+ *
+ * @param <T> type of the elements contained in the priority queue.
+ */
+public abstract class AbstractHeapPriorityQueue<T extends HeapPriorityQueueElement>
+	implements InternalPriorityQueue<T> {
+
+	/** The array that represents the heap-organized priority queue. */
+	@Nonnull
+	protected T[] queue;
+
+	/** The current size of the priority queue. */
+	@Nonnegative
+	protected int size;
+
+	@SuppressWarnings("unchecked")
+	public AbstractHeapPriorityQueue(@Nonnegative int minimumCapacity) {
+		this.queue = (T[]) new HeapPriorityQueueElement[getHeadElementIndex() + minimumCapacity];
+		this.size = 0;
+	}
+
+	@Override
+	@Nullable
+	public T poll() {
+		return size() > 0 ? removeInternal(getHeadElementIndex()) : null;
+	}
+
+	@Override
+	@Nullable
+	public T peek() {
+		// References to removed elements are expected to become set to null.
+		return queue[getHeadElementIndex()];
+	}
+
+	@Override
+	public boolean add(@Nonnull T toAdd) {
+		addInternal(toAdd);
+		return toAdd.getInternalIndex() == getHeadElementIndex();
+	}
+
+	@Override
+	public boolean remove(@Nonnull T toRemove) {
+		final int elementIndex = toRemove.getInternalIndex();
+		removeInternal(elementIndex);
+		return elementIndex == getHeadElementIndex();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return size() == 0;
+	}
+
+	@Override
+	public int size() {
+		return size;
+	}
+
+	@Override
+	public void addAll(@Nullable Collection<? extends T> toAdd) {
+
+		if (toAdd == null) {
+			return;
+		}
+
+		resizeForBulkLoad(toAdd.size());
+
+		for (T element : toAdd) {
+			add(element);
+		}
+	}
+
+	@SuppressWarnings({"unchecked"})
+	@Nonnull
+	public <O> O[] toArray(O[] out) {
+		final int heapArrayOffset = getHeadElementIndex();
+		if (out.length < size) {
+			return (O[]) Arrays.copyOfRange(queue, heapArrayOffset, heapArrayOffset + size, out.getClass());
+		} else {
+			System.arraycopy(queue, heapArrayOffset, out, 0, size);
+			if (out.length > size) {
+				out[size] = null;
+			}
+			return out;
+		}
+	}
+
+	/**
+	 * Returns an iterator over the elements in this queue. The iterator
+	 * does not return the elements in any particular order.
+	 *
+	 * @return an iterator over the elements in this queue.
+	 */
+	@Nonnull
+	@Override
+	public CloseableIterator<T> iterator() {
+		return new HeapIterator();
+	}
+
+	/**
+	 * Clears the queue.
+	 */
+	public void clear() {
+		final int arrayOffset = getHeadElementIndex();
+		Arrays.fill(queue, arrayOffset, arrayOffset + size, null);
+		size = 0;
+	}
+
+	protected void resizeForBulkLoad(int totalSize) {
+		if (totalSize > queue.length) {
+			int desiredSize = totalSize + (totalSize >>> 3);
+			resizeQueueArray(desiredSize, totalSize);
+		}
+	}
+
+	protected void resizeQueueArray(int desiredSize, int minRequiredSize) {
+		if (isValidArraySize(desiredSize)) {
+			queue = Arrays.copyOf(queue, desiredSize);
+		} else if (isValidArraySize(minRequiredSize)) {
+			queue = Arrays.copyOf(queue, MAX_ARRAY_SIZE);
+		} else {
+			throw new OutOfMemoryError("Required minimum heap size " + minRequiredSize +
+				" exceeds maximum size of " + MAX_ARRAY_SIZE + ".");
+		}
+	}
+
+	protected void moveElementToIdx(T element, int idx) {
+		queue[idx] = element;
+		element.setInternalIndex(idx);
+	}
+
+	/**
+	 * Implements how to remove the element at the given index from the queue.
+	 *
+	 * @param elementIndex the index to remove.
+	 * @return the removed element.
+	 */
+	protected abstract T removeInternal(@Nonnegative int elementIndex);
+
+	/**
+	 * Implements how to add an element to the queue.
+	 *
+	 * @param toAdd the element to add.
+	 */
+	protected abstract void addInternal(@Nonnull T toAdd);
+
+	/**
+	 * Returns the start index of the queue elements in the array.
+	 */
+	protected abstract int getHeadElementIndex();
+
+	private static boolean isValidArraySize(int size) {
+		return size >= 0 && size <= MAX_ARRAY_SIZE;
+	}
+
+	/**
+	 * {@link Iterator} implementation for {@link HeapPriorityQueue}.
+	 * {@link Iterator#remove()} is not supported.
+	 */
+	private final class HeapIterator implements CloseableIterator<T> {
+
+		private int runningIdx;
+		private final int endIdx;
+
+		HeapIterator() {
+			this.runningIdx = getHeadElementIndex();
+			this.endIdx = runningIdx + size;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return runningIdx < endIdx;
+		}
+
+		@Override
+		public T next() {
+			if (runningIdx >= endIdx) {
+				throw new NoSuchElementException("Iterator has no next element.");
+			}
+			return queue[runningIdx++];
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
deleted file mode 100644
index b1ad0df6d5e..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.runtime.state.InternalPriorityQueue;
-import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.Collection;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-
-import static org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION;
-
-/**
- * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
- * two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth
- * about the elements in this queue. The second storage is a (fast) ordered set cache, typically with some limited
- * capacity. The cache is used to improve performance of accesses to the underlying store and contains an ordered
- * (partial) view on the top elements in the ordered store. We are currently applying simple write-through to keep cache
- * and store in sync on updates and refill the cache from the store when it is empty and we expect more elements
- * contained in the store.
- *
- * @param <E> the type if the managed elements.
- */
-public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue<E>, HeapPriorityQueueElement {
-
-	/** A ordered set cache that contains a (partial) view on the top elements in the ordered store. */
-	@Nonnull
-	private final OrderedSetCache<E> orderedCache;
-
-	/** A store with ordered set semantics that contains the ground truth of all inserted elements. */
-	@Nonnull
-	private final OrderedSetStore<E> orderedStore;
-
-	/** This flag is true if there could be elements in the backend that are not in the cache (false positives ok). */
-	private boolean storeOnlyElements;
-
-	/** Management data for the {@link HeapPriorityQueueElement} trait. */
-	private int pqManagedIndex;
-
-	@SuppressWarnings("unchecked")
-	public CachingInternalPriorityQueueSet(
-		@Nonnull OrderedSetCache<E> orderedCache,
-		@Nonnull OrderedSetStore<E> orderedStore) {
-		this.orderedCache = orderedCache;
-		this.orderedStore = orderedStore;
-		// We are careful and set this to true. Could be set to false if we know for sure that the store is empty, but
-		// checking this could be an expensive operation.
-		this.storeOnlyElements = true;
-		this.pqManagedIndex = HeapPriorityQueueElement.NOT_CONTAINED;
-	}
-
-	@Nullable
-	@Override
-	public E peek() {
-
-		checkRefillCacheFromStore();
-
-		return orderedCache.peekFirst();
-	}
-
-	@Override
-	public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
-		if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
-			bulkPollRelaxedOrder(canConsume, consumer);
-		} else {
-			bulkPollStrictOrder(canConsume, consumer);
-		}
-	}
-
-	private void bulkPollRelaxedOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
-		if (orderedCache.isEmpty()) {
-			bulkPollStore(canConsume, consumer);
-		} else {
-			while (!orderedCache.isEmpty() && canConsume.test(orderedCache.peekFirst())) {
-				final E next = orderedCache.removeFirst();
-				orderedStore.remove(next);
-				consumer.accept(next);
-			}
-
-			if (orderedCache.isEmpty()) {
-				bulkPollStore(canConsume, consumer);
-			}
-		}
-	}
-
-	private void bulkPollStrictOrder(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
-		E element;
-		while ((element = peek()) != null && canConsume.test(element)) {
-			poll();
-			consumer.accept(element);
-		}
-	}
-
-	private void bulkPollStore(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
-		try (CloseableIterator<E> iterator = orderedStore.orderedIterator()) {
-			while (iterator.hasNext()) {
-				final E next = iterator.next();
-				if (canConsume.test(next)) {
-					orderedStore.remove(next);
-					consumer.accept(next);
-				} else {
-					orderedCache.add(next);
-					while (iterator.hasNext() && !orderedCache.isFull()) {
-						orderedCache.add(iterator.next());
-					}
-					break;
-				}
-			}
-		} catch (Exception e) {
-			throw new FlinkRuntimeException("Exception while bulk polling store.", e);
-		}
-	}
-
-	@Nullable
-	@Override
-	public E poll() {
-
-		checkRefillCacheFromStore();
-
-		final E first = orderedCache.removeFirst();
-
-		if (first != null) {
-			// write-through sync
-			orderedStore.remove(first);
-		}
-
-		return first;
-	}
-
-	@Override
-	public boolean add(@Nonnull E toAdd) {
-
-		checkRefillCacheFromStore();
-
-		// write-through sync
-		orderedStore.add(toAdd);
-
-		final boolean cacheFull = orderedCache.isFull();
-
-		if ((!cacheFull && !storeOnlyElements) || orderedCache.isInLowerBound(toAdd)) {
-
-			if (cacheFull) {
-				// we drop the element with lowest priority from the cache
-				orderedCache.removeLast();
-				// the dropped element is now only in the store
-				storeOnlyElements = true;
-			}
-
-			orderedCache.add(toAdd);
-			return toAdd.equals(orderedCache.peekFirst());
-		} else {
-			// we only added to the store
-			storeOnlyElements = true;
-			return false;
-		}
-	}
-
-	@Override
-	public boolean remove(@Nonnull E toRemove) {
-
-		checkRefillCacheFromStore();
-
-		boolean newHead = toRemove.equals(orderedCache.peekFirst());
-		// write-through sync
-		orderedStore.remove(toRemove);
-		orderedCache.remove(toRemove);
-		return newHead;
-	}
-
-	@Override
-	public void addAll(@Nullable Collection<? extends E> toAdd) {
-
-		if (toAdd == null) {
-			return;
-		}
-
-		for (E element : toAdd) {
-			add(element);
-		}
-	}
-
-	@Override
-	public int size() {
-		return orderedStore.size();
-	}
-
-	@Override
-	public boolean isEmpty() {
-		checkRefillCacheFromStore();
-		return orderedCache.isEmpty();
-	}
-
-	@Nonnull
-	@Override
-	public CloseableIterator<E> iterator() {
-		if (storeOnlyElements) {
-			return orderedStore.orderedIterator();
-		} else {
-			return orderedCache.orderedIterator();
-		}
-	}
-
-	@Override
-	public int getInternalIndex() {
-		return pqManagedIndex;
-	}
-
-	@Override
-	public void setInternalIndex(int updateIndex) {
-		this.pqManagedIndex = updateIndex;
-	}
-
-	/**
-	 * Refills the cache from the store when the cache is empty and we expect more elements in the store.
-	 *
-	 * TODO: We can think about exploiting the property that the store is already sorted when bulk-filling the cache.
-	 */
-	private void checkRefillCacheFromStore() {
-		if (storeOnlyElements && orderedCache.isEmpty()) {
-			try (final CloseableIterator<E> iterator = orderedStore.orderedIterator()) {
-				while (iterator.hasNext() && !orderedCache.isFull()) {
-					orderedCache.add(iterator.next());
-				}
-				storeOnlyElements = iterator.hasNext();
-			} catch (Exception e) {
-				throw new FlinkRuntimeException("Exception while refilling store from iterator.", e);
-			}
-		}
-	}
-
-	/**
-	 * Interface for an ordered cache with set semantics of elements to be used in
-	 * {@link CachingInternalPriorityQueueSet}. Cache implementations typically have a limited capacity as indicated
-	 * via the {@link #isFull()} method.
-	 *
-	 * @param <E> the type of contained elements.
-	 */
-	public interface OrderedSetCache<E> {
-
-		/**
-		 * Adds the given element to the cache (if not yet contained). This method should only be called if the cache
-		 * is not full.
-		 * @param element element to add to the cache.
-		 */
-		void add(@Nonnull E element);
-
-		/**
-		 * Removes the given element from the cache (if contained).
-		 * @param element element to remove from the cache.
-		 */
-		void remove(@Nonnull E element);
-
-		/**
-		 * Returns <code>true</code> if the cache is full and no more elements can be added.
-		 */
-		boolean isFull();
-
-		/**
-		 * Returns <code>true</code> if the cache is empty, i.e. contains ne elements.
-		 */
-		boolean isEmpty();
-
-		/**
-		 * Returns true, if the element is compares smaller than the currently largest element in the cache.
-		 */
-		boolean isInLowerBound(@Nonnull E toCheck);
-
-		/**
-		 * Removes and returns the first (smallest) element from the cache.
-		 */
-		@Nullable
-		E removeFirst();
-
-		/**
-		 * Removes and returns the last (larges) element from the cache.
-		 */
-		@Nullable
-		E removeLast();
-
-		/**
-		 * Returns the first (smallest) element from the cache (without removing it).
-		 */
-		@Nullable
-		E peekFirst();
-
-		/**
-		 * Returns the last (larges) element from the cache (without removing it).
-		 */
-		@Nullable
-		E peekLast();
-
-		/**
-		 * Returns an iterator over the store that returns element in order. The iterator must be closed by the client
-		 * after usage.
-		 */
-		@Nonnull
-		CloseableIterator<E> orderedIterator();
-	}
-
-	/**
-	 * Interface for an ordered store with set semantics of elements to be used in
-	 * {@link CachingInternalPriorityQueueSet}. Stores are assumed to have (practically) unlimited capacity, but their
-	 * operations could all be expensive.
-	 *
-	 * @param <E> the type of contained elements.
-	 */
-	public interface OrderedSetStore<E> {
-
-		/**
-		 * Adds the given element to the store (if not yet contained).
-		 * @param element element to add to the store.
-		 */
-		void add(@Nonnull E element);
-
-		/**
-		 * Removed the given element from the cache (if contained).
-		 * @param element element to remove from the cache.
-		 */
-		void remove(@Nonnull E element);
-
-		/**
-		 * Returns the number of elements in the store.
-		 */
-		@Nonnegative
-		int size();
-
-		/**
-		 * Returns an iterator over the store that returns element in order. The iterator must be closed by the client
-		 * after usage.
-		 */
-		@Nonnull
-		CloseableIterator<E> orderedIterator();
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
index 22b24197e84..2dd57010fea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
@@ -18,22 +18,10 @@
 
 package org.apache.flink.runtime.state.heap;
 
-import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.PriorityComparator;
-import org.apache.flink.util.CloseableIterator;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
-
-import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
 
 /**
  * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
@@ -51,7 +39,7 @@
  * @param <T> type of the contained elements.
  */
 public class HeapPriorityQueue<T extends HeapPriorityQueueElement>
-	implements InternalPriorityQueue<T> {
+	extends AbstractHeapPriorityQueue<T> {
 
 	/**
 	 * The index of the head element in the array that represents the heap.
@@ -61,17 +49,8 @@
 	/**
 	 * Comparator for the priority of contained elements.
 	 */
-	private final PriorityComparator<T> elementPriorityComparator;
-
-	/**
-	 * The array that represents the heap-organized priority queue.
-	 */
-	private T[] queue;
-
-	/**
-	 * The current size of the priority queue.
-	 */
-	private int size;
+	@Nonnull
+	protected final PriorityComparator<T> elementPriorityComparator;
 
 	/**
 	 * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
@@ -83,124 +62,31 @@
 	public HeapPriorityQueue(
 		@Nonnull PriorityComparator<T> elementPriorityComparator,
 		@Nonnegative int minimumCapacity) {
-
+		super(minimumCapacity);
 		this.elementPriorityComparator = elementPriorityComparator;
-		this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
 	}
 
-	@Override
-	public void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
-		T element;
-		while ((element = peek()) != null && canConsume.test(element)) {
-			poll();
-			consumer.accept(element);
+	public void adjustModifiedElement(@Nonnull T element) {
+		final int elementIndex = element.getInternalIndex();
+		if (element == queue[elementIndex]) {
+			adjustElementAtIndex(element, elementIndex);
 		}
 	}
 
 	@Override
-	@Nullable
-	public T poll() {
-		return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
-	}
-
-	@Override
-	@Nullable
-	public T peek() {
-		return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
-	}
-
-	/**
-	 * Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}.
-	 *
-	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
-	 * Only returns <code>false</code> iff the head element was not changed by this operation.
-	 */
-	@Override
-	public boolean add(@Nonnull T toAdd) {
-		return addInternal(toAdd);
-	}
-
-	/**
-	 * This remove is based on object identity, not the result of equals. We use the objects managed index to find
-	 * the instance in the queue array.
-	 *
-	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
-	 * Only returns <code>false</code> iff the head element was not changed by this operation.
-	 */
-	@Override
-	public boolean remove(@Nonnull T toRemove) {
-		return removeInternal(toRemove);
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return size() == 0;
-	}
-
-	@Override
-	@Nonnegative
-	public int size() {
-		return size;
-	}
-
-	public void clear() {
-		size = 0;
-		Arrays.fill(queue, null);
-	}
-
-	@SuppressWarnings({"unchecked"})
-	@Nonnull
-	public <O> O[] toArray(O[] out) {
-		if (out.length < size) {
-			return (O[]) Arrays.copyOfRange(queue, QUEUE_HEAD_INDEX, QUEUE_HEAD_INDEX + size, out.getClass());
-		} else {
-			System.arraycopy(queue, QUEUE_HEAD_INDEX, out, 0, size);
-			if (out.length > size) {
-				out[size] = null;
-			}
-			return out;
-		}
-	}
-
-	/**
-	 * Returns an iterator over the elements in this queue. The iterator
-	 * does not return the elements in any particular order.
-	 *
-	 * @return an iterator over the elements in this queue.
-	 */
-	@Nonnull
-	public CloseableIterator<T> iterator() {
-		return new HeapIterator();
+	protected int getHeadElementIndex() {
+		return QUEUE_HEAD_INDEX;
 	}
 
 	@Override
-	public void addAll(@Nullable Collection<? extends T> restoredElements) {
-
-		if (restoredElements == null) {
-			return;
-		}
-
-		resizeForBulkLoad(restoredElements.size());
-
-		for (T element : restoredElements) {
-			add(element);
-		}
-	}
-
-	private boolean addInternal(@Nonnull T element) {
+	protected void addInternal(@Nonnull T element) {
 		final int newSize = increaseSizeByOne();
 		moveElementToIdx(element, newSize);
 		siftUp(newSize);
-		return element.getInternalIndex() == QUEUE_HEAD_INDEX;
 	}
 
-	private boolean removeInternal(@Nonnull T elementToRemove) {
-		final int elementIndex = elementToRemove.getInternalIndex();
-		removeElementAtIndex(elementIndex);
-		return elementIndex == QUEUE_HEAD_INDEX;
-	}
-
-	private T removeElementAtIndex(int removeIdx) {
+	@Override
+	protected T removeInternal(int removeIdx) {
 		T[] heap = this.queue;
 		T removedValue = heap[removeIdx];
 
@@ -220,13 +106,6 @@ private T removeElementAtIndex(int removeIdx) {
 		return removedValue;
 	}
 
-	public void adjustModifiedElement(@Nonnull T element) {
-		final int elementIndex = element.getInternalIndex();
-		if (element == queue[elementIndex]) {
-			adjustElementAtIndex(element, elementIndex);
-		}
-	}
-
 	private void adjustElementAtIndex(T element, int index) {
 		siftDown(index);
 		if (queue[index] == element) {
@@ -285,11 +164,6 @@ private boolean isElementPriorityLessThen(T a, T b) {
 		return elementPriorityComparator.comparePriority(a, b) < 0;
 	}
 
-	private void moveElementToIdx(T element, int idx) {
-		queue[idx] = element;
-		element.setInternalIndex(idx);
-	}
-
 	private int increaseSizeByOne() {
 		final int oldArraySize = queue.length;
 		final int minRequiredNewSize = ++size;
@@ -300,56 +174,4 @@ private int increaseSizeByOne() {
 		// TODO implement shrinking as well?
 		return minRequiredNewSize;
 	}
-
-	private void resizeForBulkLoad(int totalSize) {
-		if (totalSize > queue.length) {
-			int desiredSize = totalSize + (totalSize >>> 3);
-			resizeQueueArray(desiredSize, totalSize);
-		}
-	}
-
-	private void resizeQueueArray(int desiredSize, int minRequiredSize) {
-		if (isValidArraySize(desiredSize)) {
-			queue = Arrays.copyOf(queue, desiredSize);
-		} else if (isValidArraySize(minRequiredSize)) {
-			queue = Arrays.copyOf(queue, MAX_ARRAY_SIZE);
-		} else {
-			throw new OutOfMemoryError("Required minimum heap size " + minRequiredSize +
-				" exceeds maximum size of " + MAX_ARRAY_SIZE + ".");
-		}
-	}
-
-	private static boolean isValidArraySize(int size) {
-		return size >= 0 && size <= MAX_ARRAY_SIZE;
-	}
-
-	/**
-	 * {@link Iterator} implementation for {@link HeapPriorityQueue}.
-	 * {@link Iterator#remove()} is not supported.
-	 */
-	private class HeapIterator implements CloseableIterator<T> {
-
-		private int iterationIdx;
-
-		HeapIterator() {
-			this.iterationIdx = QUEUE_HEAD_INDEX - 1;
-		}
-
-		@Override
-		public boolean hasNext() {
-			return iterationIdx < size;
-		}
-
-		@Override
-		public T next() {
-			if (iterationIdx >= size) {
-				throw new NoSuchElementException("Iterator has no next element.");
-			}
-			return queue[++iterationIdx];
-		}
-
-		@Override
-		public void close() {
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
index 79f319c8d8a..c0215acb06c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
@@ -106,11 +106,7 @@ public HeapPriorityQueueSet(
 	@Nullable
 	public T poll() {
 		final T toRemove = super.poll();
-		if (toRemove != null) {
-			return getDedupMapForElement(toRemove).remove(toRemove);
-		} else {
-			return null;
-		}
+		return toRemove != null ? getDedupMapForElement(toRemove).remove(toRemove) : null;
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
index 5fd67f07be1..b2b28437505 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
@@ -68,16 +68,6 @@ public HeapPriorityQueueSnapshotRestoreWrapper(
 	@Override
 	public StateSnapshot stateSnapshot() {
 		final T[] queueDump = (T[]) priorityQueue.toArray(new HeapPriorityQueueElement[priorityQueue.size()]);
-
-		final TypeSerializer<T> elementSerializer = metaInfo.getElementSerializer();
-
-		// turn the flat copy into a deep copy if required.
-		if (!elementSerializer.isImmutableType()) {
-			for (int i = 0; i < queueDump.length; ++i) {
-				queueDump[i] = elementSerializer.copy(queueDump[i]);
-			}
-		}
-
 		return new HeapPriorityQueueStateSnapshot<>(
 			queueDump,
 			keyExtractorFunction,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
index d8b0a5a6628..79787644e0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
@@ -35,8 +35,6 @@
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.function.Consumer;
-import java.util.function.Predicate;
 
 /**
  * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
@@ -49,11 +47,9 @@
 public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
 	implements InternalPriorityQueue<T>, KeyGroupedInternalPriorityQueue<T> {
 
-	static final boolean ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION = false;
-
 	/** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
 	@Nonnull
-	private final HeapPriorityQueue<PQ> heapOfkeyGroupedHeaps;
+	private final HeapPriorityQueue<PQ> heapOfKeyGroupedHeaps;
 
 	/** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
 	@Nonnull
@@ -83,55 +79,30 @@ public KeyGroupPartitionedPriorityQueue(
 		this.totalKeyGroups = totalKeyGroups;
 		this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
 		this.keyGroupedHeaps = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
-		this.heapOfkeyGroupedHeaps = new HeapPriorityQueue<>(
+		this.heapOfKeyGroupedHeaps = new HeapPriorityQueue<>(
 			new InternalPriorityQueueComparator<>(elementPriorityComparator),
 			keyGroupRange.getNumberOfKeyGroups());
 		for (int i = 0; i < keyGroupedHeaps.length; i++) {
 			final PQ keyGroupSubHeap =
-				orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementPriorityComparator);
+				orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, keyExtractor, elementPriorityComparator);
 			keyGroupedHeaps[i] = keyGroupSubHeap;
-			heapOfkeyGroupedHeaps.add(keyGroupSubHeap);
-		}
-	}
-
-	@Override
-	public void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
-		if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
-			bulkPollRelaxedOrder(canConsume, consumer);
-		} else {
-			bulkPollStrictOrder(canConsume, consumer);
-		}
-	}
-
-	private void bulkPollRelaxedOrder(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
-		PQ headList = heapOfkeyGroupedHeaps.peek();
-		while (headList.peek() != null && canConsume.test(headList.peek())) {
-			headList.bulkPoll(canConsume, consumer);
-			heapOfkeyGroupedHeaps.adjustModifiedElement(headList);
-		}
-	}
-
-	private void bulkPollStrictOrder(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
-		T element;
-		while ((element = peek()) != null && canConsume.test(element)) {
-			poll();
-			consumer.accept(element);
+			heapOfKeyGroupedHeaps.add(keyGroupSubHeap);
 		}
 	}
 
 	@Nullable
 	@Override
 	public T poll() {
-		final PQ headList = heapOfkeyGroupedHeaps.peek();
+		final PQ headList = heapOfKeyGroupedHeaps.peek();
 		final T head = headList.poll();
-		heapOfkeyGroupedHeaps.adjustModifiedElement(headList);
+		heapOfKeyGroupedHeaps.adjustModifiedElement(headList);
 		return head;
 	}
 
 	@Nullable
 	@Override
 	public T peek() {
-		return heapOfkeyGroupedHeaps.peek().peek();
+		return heapOfKeyGroupedHeaps.peek().peek();
 	}
 
 	@Override
@@ -140,7 +111,7 @@ public boolean add(@Nonnull T toAdd) {
 
 		// the branch checks if the head element has (potentially) changed.
 		if (list.add(toAdd)) {
-			heapOfkeyGroupedHeaps.adjustModifiedElement(list);
+			heapOfKeyGroupedHeaps.adjustModifiedElement(list);
 			// could we have a new head?
 			return toAdd.equals(peek());
 		} else {
@@ -157,7 +128,7 @@ public boolean remove(@Nonnull T toRemove) {
 
 		// the branch checks if the head element has (potentially) changed.
 		if (list.remove(toRemove)) {
-			heapOfkeyGroupedHeaps.adjustModifiedElement(list);
+			heapOfKeyGroupedHeaps.adjustModifiedElement(list);
 			// could we have a new head?
 			return toRemove.equals(oldHead);
 		} else {
@@ -330,6 +301,7 @@ public int comparePriority(Q o1, Q o2) {
 		PQS create(
 			@Nonnegative int keyGroupId,
 			@Nonnegative int numKeyGroups,
+			@Nonnull KeyExtractorFunction<T> keyExtractorFunction,
 			@Nonnull PriorityComparator<T> elementPriorityComparator);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
deleted file mode 100644
index 14c281effc9..00000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.util.Preconditions;
-
-import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.Comparator;
-
-/**
- * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetCache} based
- * on an AVL-Tree. We chose the implementation from fastutil over JDK for performance reasons.
- *
- * <p>Maintainer notes: We can consider the following potential performance improvements. First, we could introduce a
- * bulk-load method to OrderedSetCache to exploit the fact that adding from an OrderedSetStore is already happening in
- * sorted order, e.g. there are more efficient ways to construct search trees from sorted elements. Second, we could
- * replace the internal AVL-Tree with an extended variant of {@link HeapPriorityQueueSet} that is organized as a
- * Min-Max-Heap.
- *
- * @param <E> type of the contained elements.
- */
-public class TreeOrderedSetCache<E> implements CachingInternalPriorityQueueSet.OrderedSetCache<E> {
-
-	/** The tree is used to store cached elements. */
-	@Nonnull
-	private final ObjectAVLTreeSet<E> avlTree;
-
-	/** The element comparator. */
-	@Nonnull
-	private final Comparator<E> elementComparator;
-
-	/** The maximum capacity of the cache. */
-	@Nonnegative
-	private final int capacity;
-
-	/**
-	 * Creates a new {@link TreeOrderedSetCache} with the given capacity and element comparator. Capacity must be > 0.
-	 * @param elementComparator comparator for the cached elements.
-	 * @param capacity the capacity of the cache. Must be > 0.
-	 */
-	public TreeOrderedSetCache(@Nonnull Comparator<E> elementComparator, @Nonnegative int capacity) {
-		Preconditions.checkArgument(capacity > 0, "Cache capacity must be greater than 0.");
-		this.avlTree = new ObjectAVLTreeSet<>(elementComparator);
-		this.elementComparator = elementComparator;
-		this.capacity = capacity;
-	}
-
-	@Override
-	public void add(@Nonnull E element) {
-		assert !isFull();
-		avlTree.add(element);
-	}
-
-	@Override
-	public void remove(@Nonnull E element) {
-		avlTree.remove(element);
-	}
-
-	@Override
-	public boolean isFull() {
-		return avlTree.size() == capacity;
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return avlTree.isEmpty();
-	}
-
-	@Override
-	public boolean isInLowerBound(@Nonnull E toCheck) {
-		return avlTree.isEmpty() || elementComparator.compare(peekLast(), toCheck) > 0;
-	}
-
-	@Nullable
-	@Override
-	public E removeFirst() {
-		if (avlTree.isEmpty()) {
-			return null;
-		}
-		final E first = avlTree.first();
-		avlTree.remove(first);
-		return first;
-	}
-
-	@Nullable
-	@Override
-	public E removeLast() {
-		if (avlTree.isEmpty()) {
-			return null;
-		}
-		final E last = avlTree.last();
-		avlTree.remove(last);
-		return last;
-	}
-
-	@Nullable
-	@Override
-	public E peekFirst() {
-		return !avlTree.isEmpty() ? avlTree.first() : null;
-	}
-
-	@Nullable
-	@Override
-	public E peekLast() {
-		return !avlTree.isEmpty() ? avlTree.last() : null;
-	}
-
-	@Nonnull
-	@Override
-	public CloseableIterator<E> orderedIterator() {
-		return CloseableIterator.adapterForIterator(avlTree.iterator());
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
index 0cd551ca0e5..510d2774f6b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
@@ -21,13 +21,17 @@
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -35,6 +39,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -53,14 +58,15 @@
 	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
 	protected static final PriorityComparator<TestElement> TEST_ELEMENT_PRIORITY_COMPARATOR =
 		(left, right) -> Long.compare(left.getPriority(), right.getPriority());
-	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR = (o1, o2) -> {
-		int priorityCmp = TEST_ELEMENT_PRIORITY_COMPARATOR.comparePriority(o1, o2);
-		if (priorityCmp != 0) {
-			return priorityCmp;
-		}
-		// to fully comply with compareTo/equals contract.
-		return Long.compare(o1.getKey(), o2.getKey());
-	};
+	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR = new TestElementComparator();
+
+	protected Comparator<Long> getTestElementPriorityComparator() {
+		return Long::compareTo;
+	}
+
+	private long getHighestPriorityValueForComparator() {
+		return getTestElementPriorityComparator().compare(-1L , 1L) > 0 ? Long.MAX_VALUE : Long.MIN_VALUE;
+	}
 
 	protected static void insertRandomElements(
 		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
@@ -107,13 +113,14 @@ protected static void insertRandomElements(
 	public void testPeekPollOrder() {
 		final int initialCapacity = 4;
 		final int testSize = 1000;
+		final Comparator<Long> comparator = getTestElementPriorityComparator();
 		InternalPriorityQueue<TestElement> priorityQueue =
 			newPriorityQueue(initialCapacity);
 		HashSet<TestElement> checkSet = new HashSet<>(testSize);
 
 		insertRandomElements(priorityQueue, checkSet, testSize);
 
-		long lastPriorityValue = Long.MIN_VALUE;
+		long lastPriorityValue = getHighestPriorityValueForComparator();
 		int lastSize = priorityQueue.size();
 		Assert.assertEquals(testSize, lastSize);
 		TestElement testElement;
@@ -122,7 +129,7 @@ public void testPeekPollOrder() {
 			Assert.assertEquals(lastSize, priorityQueue.size());
 			Assert.assertEquals(testElement, priorityQueue.poll());
 			Assert.assertTrue(checkSet.remove(testElement));
-			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+			Assert.assertTrue(comparator.compare(testElement.getPriority(), lastPriorityValue) >= 0);
 			lastPriorityValue = testElement.getPriority();
 			--lastSize;
 		}
@@ -136,7 +143,7 @@ public void testPeekPollOrder() {
 	public void testRemoveInsertMixKeepsOrder() {
 
 		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
-
+		final Comparator<Long> comparator = getTestElementPriorityComparator();
 		final ThreadLocalRandom random = ThreadLocalRandom.current();
 		final int testSize = 300;
 		final int addCounterMax = testSize / 4;
@@ -148,6 +155,8 @@ public void testRemoveInsertMixKeepsOrder() {
 		// check that the whole set is still in order
 		while (!checkSet.isEmpty()) {
 
+			final long highestPrioValue = getHighestPriorityValueForComparator();
+
 			Iterator<TestElement> iterator = checkSet.iterator();
 			TestElement element = iterator.next();
 			iterator.remove();
@@ -160,16 +169,23 @@ public void testRemoveInsertMixKeepsOrder() {
 				priorityQueue.remove(element);
 			}
 
-			long lastPriorityValue = removesHead ? element.getPriority() : Long.MIN_VALUE;
+			long currentPriorityWatermark;
+
+			// test some bulk polling from time to time
+			if (removesHead) {
+				currentPriorityWatermark = element.getPriority();
+			} else {
+				currentPriorityWatermark = highestPrioValue;
+			}
 
 			while ((element = priorityQueue.poll()) != null) {
-				Assert.assertTrue(element.getPriority() >= lastPriorityValue);
-				lastPriorityValue = element.getPriority();
+				Assert.assertTrue(comparator.compare(element.getPriority(), currentPriorityWatermark) >= 0);
+				currentPriorityWatermark = element.getPriority();
 				if (--iterationsTillNextAdds == 0) {
 					// some random adds
 					iterationsTillNextAdds = random.nextInt(addCounterMax);
 					insertRandomElements(priorityQueue, new HashSet<>(checkSet), 1 + random.nextInt(3));
-					lastPriorityValue = priorityQueue.peek().getPriority();
+					currentPriorityWatermark = priorityQueue.peek().getPriority();
 				}
 			}
 
@@ -182,6 +198,7 @@ public void testRemoveInsertMixKeepsOrder() {
 	@Test
 	public void testPoll() {
 		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+		final Comparator<Long> comparator = getTestElementPriorityComparator();
 
 		Assert.assertNull(priorityQueue.poll());
 
@@ -189,12 +206,12 @@ public void testPoll() {
 		HashSet<TestElement> checkSet = new HashSet<>(testSize);
 		insertRandomElements(priorityQueue, checkSet, testSize);
 
-		long lastPriorityValue = Long.MIN_VALUE;
+		long lastPriorityValue = getHighestPriorityValueForComparator();
 		while (!priorityQueue.isEmpty()) {
 			TestElement removed = priorityQueue.poll();
 			Assert.assertNotNull(removed);
 			Assert.assertTrue(checkSet.remove(removed));
-			Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
+			Assert.assertTrue(comparator.compare(removed.getPriority(), lastPriorityValue) >= 0);
 			lastPriorityValue = removed.getPriority();
 		}
 		Assert.assertTrue(checkSet.isEmpty());
@@ -286,18 +303,21 @@ public void testAdd() {
 		InternalPriorityQueue<TestElement> priorityQueue =
 			newPriorityQueue(1);
 
-		TestElement lowPrioElement = new TestElement(4711L, 42L);
-		TestElement highPrioElement = new TestElement(815L, 23L);
-		Assert.assertTrue(priorityQueue.add(lowPrioElement));
+		final List<TestElement> testElements =
+			Arrays.asList(new TestElement(4711L, 42L), new TestElement(815L, 23L));
+
+		testElements.sort((l, r) -> getTestElementPriorityComparator().compare(r.priority, l.priority));
+
+		Assert.assertTrue(priorityQueue.add(testElements.get(0)));
 		if (testSetSemanticsAgainstDuplicateElements()) {
-			priorityQueue.add(lowPrioElement.deepCopy());
+			priorityQueue.add(testElements.get(0).deepCopy());
 		}
 		Assert.assertEquals(1, priorityQueue.size());
-		Assert.assertTrue(priorityQueue.add(highPrioElement));
+		Assert.assertTrue(priorityQueue.add(testElements.get(1)));
 		Assert.assertEquals(2, priorityQueue.size());
-		Assert.assertEquals(highPrioElement, priorityQueue.poll());
+		Assert.assertEquals(testElements.get(1), priorityQueue.poll());
 		Assert.assertEquals(1, priorityQueue.size());
-		Assert.assertEquals(lowPrioElement, priorityQueue.poll());
+		Assert.assertEquals(testElements.get(0), priorityQueue.poll());
 		Assert.assertEquals(0, priorityQueue.size());
 	}
 
@@ -378,6 +398,14 @@ public int hashCode() {
 		public TestElement deepCopy() {
 			return new TestElement(key, priority);
 		}
+
+		@Override
+		public String toString() {
+			return "TestElement{" +
+				"key=" + key +
+				", priority=" + priority +
+				'}';
+		}
 	}
 
 	/**
@@ -471,4 +499,27 @@ public TypeSerializerConfigSnapshot snapshotConfiguration() {
 			throw new UnsupportedOperationException();
 		}
 	}
+
+	/**
+	 * Comparator for test elements, operating on the serialized bytes of the elements.
+	 */
+	protected static class TestElementComparator implements Comparator<TestElement> {
+
+		@Override
+		public int compare(TestElement o1, TestElement o2) {
+
+			ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();
+			DataOutputViewStreamWrapper ow = new DataOutputViewStreamWrapper(os);
+			try {
+				TestElementSerializer.INSTANCE.serialize(o1, ow);
+				byte[] a1 = os.toByteArray();
+				os.reset();
+				TestElementSerializer.INSTANCE.serialize(o2, ow);
+				byte[] a2 = os.toByteArray();
+				return UnsignedBytes.lexicographicalComparator().compare(a1, a2);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSetTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSetTestBase.java
deleted file mode 100644
index 79fd5566a9a..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSetTestBase.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.state.heap;
-
-import org.apache.flink.runtime.state.InternalPriorityQueue;
-import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
-
-/**
- * Test for {@link CachingInternalPriorityQueueSet}.
- */
-public abstract class CachingInternalPriorityQueueSetTestBase extends InternalPriorityQueueTestBase {
-
-	@Override
-	protected InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
-		final CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> cache = createOrderedSetCache();
-		final CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> store = createOrderedSetStore();
-		return new CachingInternalPriorityQueueSet<>(cache, store);
-	}
-
-	@Override
-	protected boolean testSetSemanticsAgainstDuplicateElements() {
-		return true;
-	}
-
-	protected abstract CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> createOrderedSetStore();
-
-	protected abstract CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> createOrderedSetCache();
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
index d348e10458f..175c6153a0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
@@ -19,6 +19,11 @@
 
 import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.PriorityComparator;
+
+import javax.annotation.Nonnull;
 
 /**
  * Test for {@link KeyGroupPartitionedPriorityQueue}.
@@ -34,20 +39,46 @@
 			KEY_GROUP_RANGE, KEY_GROUP_RANGE.getNumberOfKeyGroups());
 	}
 
-	protected KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<
-			TestElement, CachingInternalPriorityQueueSet<TestElement>> newFactory(int initialCapacity) {
+	private KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<
+			TestElement, KeyGroupHeapPQSet<TestElement>> newFactory(int initialCapacity) {
 
-		return (keyGroupId, numKeyGroups, elementComparator) -> {
-			CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> cache =
-				new TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 32);
-			CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> store =
-				new TestOrderedStore<>(TEST_ELEMENT_COMPARATOR);
-			return new CachingInternalPriorityQueueSet<>(cache, store);
-		};
+		return (keyGroupId, numKeyGroups, keyExtractorFunction, elementComparator) ->
+			new KeyGroupHeapPQSet<>(
+				elementComparator,
+				keyExtractorFunction,
+				initialCapacity,
+				KeyGroupRange.of(keyGroupId, keyGroupId),
+				numKeyGroups);
 	}
 
 	@Override
 	protected boolean testSetSemanticsAgainstDuplicateElements() {
 		return true;
 	}
+
+	private static class KeyGroupHeapPQSet<T extends HeapPriorityQueueElement> extends HeapPriorityQueueSet<T>
+		implements HeapPriorityQueueElement {
+
+		private int internalIndex;
+
+		public KeyGroupHeapPQSet(
+			@Nonnull PriorityComparator<T> elementPriorityComparator,
+			@Nonnull KeyExtractorFunction<T> keyExtractor,
+			int minimumCapacity,
+			@Nonnull KeyGroupRange keyGroupRange,
+			int totalNumberOfKeyGroups) {
+			super(elementPriorityComparator, keyExtractor, minimumCapacity, keyGroupRange, totalNumberOfKeyGroups);
+			this.internalIndex = HeapPriorityQueueElement.NOT_CONTAINED;
+		}
+
+		@Override
+		public int getInternalIndex() {
+			return internalIndex;
+		}
+
+		@Override
+		public void setInternalIndex(int newIndex) {
+			internalIndex = newIndex;
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/OrderedSetCacheTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/OrderedSetCacheTestBase.java
deleted file mode 100644
index 1d627edc883..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/OrderedSetCacheTestBase.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.state.heap;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Random;
-import java.util.TreeSet;
-
-/**
- * Test base for instances of
- * {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetCache}.
- */
-public abstract class OrderedSetCacheTestBase {
-
-	@Test
-	public void testOrderedSetCacheContract() {
-		final Random random = new Random(0x42);
-		final int capacity = 5000;
-		final int keySpaceUpperBound = 100 * capacity;
-		final TreeSet<Integer> checkSet = new TreeSet<>(Integer::compareTo);
-		final CachingInternalPriorityQueueSet.OrderedSetCache<Integer> testInstance = createInstance(capacity);
-
-		Assert.assertTrue(testInstance.isEmpty());
-
-		while (checkSet.size() < capacity) {
-			Assert.assertEquals(checkSet.size() >= capacity, testInstance.isFull());
-			if (!checkSet.isEmpty() && random.nextInt(10) == 0) {
-				final int toDelete = pickContainedRandomElement(checkSet, random);
-				Assert.assertTrue(checkSet.remove(toDelete));
-				testInstance.remove(toDelete);
-			} else {
-				final int randomValue = random.nextInt(keySpaceUpperBound);
-				checkSet.add(randomValue);
-				testInstance.add(randomValue);
-			}
-			Assert.assertEquals(checkSet.isEmpty(), testInstance.isEmpty());
-
-			Assert.assertEquals(checkSet.first(), testInstance.peekFirst());
-			Assert.assertEquals(checkSet.last(), testInstance.peekLast());
-
-			Assert.assertFalse(testInstance.isInLowerBound(checkSet.last()));
-			Assert.assertTrue(testInstance.isInLowerBound(checkSet.last() - 1));
-		}
-
-		Assert.assertTrue(testInstance.isFull());
-		Assert.assertFalse(testInstance.isInLowerBound(checkSet.last()));
-		Assert.assertTrue(testInstance.isInLowerBound(checkSet.last() - 1));
-
-		testInstance.remove(pickNotContainedRandomElement(checkSet, random, keySpaceUpperBound));
-		Assert.assertTrue(testInstance.isFull());
-
-		int containedKey = pickContainedRandomElement(checkSet, random);
-
-		Assert.assertTrue(checkSet.remove(containedKey));
-		testInstance.remove(containedKey);
-
-		Assert.assertFalse(testInstance.isFull());
-
-		for (int i = 0; i < capacity; ++i) {
-			if (random.nextInt(1) == 0) {
-				Assert.assertEquals(checkSet.pollFirst(), testInstance.removeFirst());
-			} else {
-				Assert.assertEquals(checkSet.pollLast(), testInstance.removeLast());
-			}
-		}
-
-		Assert.assertFalse(testInstance.isFull());
-		Assert.assertTrue(testInstance.isEmpty());
-	}
-
-	private int pickNotContainedRandomElement(TreeSet<Integer> checkSet, Random random, int upperBound) {
-		int notContainedKey;
-		do {
-			notContainedKey = random.nextInt(upperBound);
-		} while (checkSet.contains(notContainedKey));
-		return notContainedKey;
-	}
-
-	private int pickContainedRandomElement(TreeSet<Integer> checkSet, Random random) {
-		assert !checkSet.isEmpty();
-		return checkSet.ceiling(1 + random.nextInt(checkSet.last()));
-	}
-
-	protected abstract CachingInternalPriorityQueueSet.OrderedSetCache<Integer> createInstance(int capacity);
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/SimpleCachingInternalPriorityQueueSetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/SimpleCachingInternalPriorityQueueSetTest.java
deleted file mode 100644
index 4c4eb460ae3..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/SimpleCachingInternalPriorityQueueSetTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-/**
- * Test for {@link CachingInternalPriorityQueueSet}.
- */
-public class SimpleCachingInternalPriorityQueueSetTest extends CachingInternalPriorityQueueSetTestBase {
-
-	@Override
-	protected CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> createOrderedSetStore() {
-		return new TestOrderedStore<>(TEST_ELEMENT_COMPARATOR);
-	}
-
-	@Override
-	protected CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> createOrderedSetCache() {
-		return new TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 3);
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCacheTest.java
deleted file mode 100644
index cfe823effed..00000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCacheTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.heap;
-
-/**
- * Test for {@link TreeOrderedSetCache}.
- */
-public class TreeOrderedSetCacheTest extends OrderedSetCacheTestBase {
-
-	@Override
-	protected CachingInternalPriorityQueueSet.OrderedSetCache<Integer> createInstance(int capacity) {
-		return new TreeOrderedSetCache<>(Integer::compareTo, capacity);
-	}
-}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
new file mode 100644
index 00000000000..68b5b5fdee3
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBCachingPriorityQueueSet.java
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+
+import static org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.OrderedByteArraySetCache.LEXICOGRAPHIC_BYTE_COMPARATOR;
+
+/**
+ * A priority queue with set semantics, implemented on top of RocksDB. This uses a {@link TreeSet} to cache the bytes
+ * of up to the first n elements from RocksDB in memory to reduce interaction with RocksDB, in particular seek
+ * operations. Cache uses a simple write-through policy.
+ *
+ * @param <E> the type of the contained elements in the queue.
+ */
+public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
+	implements InternalPriorityQueue<E>, HeapPriorityQueueElement {
+
+	/** Serialized empty value to insert into RocksDB. */
+	private static final byte[] DUMMY_BYTES = new byte[] {};
+
+	/** The RocksDB instance that serves as store. */
+	@Nonnull
+	private final RocksDB db;
+
+	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
+	@Nonnull
+	private final ColumnFamilyHandle columnFamilyHandle;
+
+	/**
+	 * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
+	 * aligned with their logical order.
+	 */
+	@Nonnull
+	private final TypeSerializer<E> byteOrderProducingSerializer;
+
+	/** Wrapper to batch all writes to RocksDB. */
+	@Nonnull
+	private final RocksDBWriteBatchWrapper batchWrapper;
+
+	/** The key-group id in serialized form. */
+	@Nonnull
+	private final byte[] groupPrefixBytes;
+
+	/** Output view that helps to serialize elements. */
+	@Nonnull
+	private final ByteArrayDataOutputView outputView;
+
+	/** Input view that helps to de-serialize elements. */
+	@Nonnull
+	private final ByteArrayDataInputView inputView;
+
+	/** In memory cache that holds a head-subset of the elements stored in RocksDB. */
+	@Nonnull
+	private final OrderedByteArraySetCache orderedCache;
+
+	/** This holds the key that we use to seek to the first element in RocksDB, to improve seek/iterator performance. */
+	@Nonnull
+	private byte[] seekHint;
+
+	/** Cache for the head element in de-serialized form. */
+	@Nullable
+	private E peekCache;
+
+	/** This flag is true iff all elements in RocksDB are also contained in the cache. */
+	private boolean allElementsInCache;
+
+	/** Index for management as a {@link HeapPriorityQueueElement}. */
+	private int internalIndex;
+
+	RocksDBCachingPriorityQueueSet(
+		@Nonnegative int keyGroupId,
+		@Nonnegative int keyGroupPrefixBytes,
+		@Nonnull RocksDB db,
+		@Nonnull ColumnFamilyHandle columnFamilyHandle,
+		@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
+		@Nonnull ByteArrayDataOutputView outputStream,
+		@Nonnull ByteArrayDataInputView inputStream,
+		@Nonnull RocksDBWriteBatchWrapper batchWrapper,
+		@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
+		this.db = db;
+		this.columnFamilyHandle = columnFamilyHandle;
+		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
+		this.batchWrapper = batchWrapper;
+		this.outputView = outputStream;
+		this.inputView = inputStream;
+		this.orderedCache = orderedByteArraySetCache;
+		this.allElementsInCache = false;
+		this.groupPrefixBytes = createKeyGroupBytes(keyGroupId, keyGroupPrefixBytes);
+		this.seekHint = groupPrefixBytes;
+		this.internalIndex = HeapPriorityQueueElement.NOT_CONTAINED;
+	}
+
+	@Nullable
+	@Override
+	public E peek() {
+
+		checkRefillCacheFromStore();
+
+		if (peekCache != null) {
+			return peekCache;
+		}
+
+		byte[] firstBytes = orderedCache.peekFirst();
+		if (firstBytes != null) {
+			peekCache = deserializeElement(firstBytes);
+			return peekCache;
+		} else {
+			return null;
+		}
+	}
+
+	@Nullable
+	@Override
+	public E poll() {
+
+		checkRefillCacheFromStore();
+
+		final byte[] firstBytes = orderedCache.pollFirst();
+
+		if (firstBytes == null) {
+			return null;
+		}
+
+		// write-through sync
+		removeFromRocksDB(firstBytes);
+
+		if (orderedCache.isEmpty()) {
+			seekHint = firstBytes;
+		}
+
+		if (peekCache != null) {
+			E fromCache = peekCache;
+			peekCache = null;
+			return fromCache;
+		} else {
+			return deserializeElement(firstBytes);
+		}
+	}
+
+	@Override
+	public boolean add(@Nonnull E toAdd) {
+
+		checkRefillCacheFromStore();
+
+		final byte[] toAddBytes = serializeElement(toAdd);
+
+		final boolean cacheFull = orderedCache.isFull();
+
+		if ((!cacheFull && allElementsInCache) ||
+			LEXICOGRAPHIC_BYTE_COMPARATOR.compare(toAddBytes, orderedCache.peekLast()) < 0) {
+
+			if (cacheFull) {
+				// we drop the element with lowest priority from the cache
+				orderedCache.pollLast();
+				// the dropped element is now only in the store
+				allElementsInCache = false;
+			}
+
+			if (orderedCache.add(toAddBytes)) {
+				// write-through sync
+				addToRocksDB(toAddBytes);
+				if (toAddBytes == orderedCache.peekFirst()) {
+					peekCache = null;
+					return true;
+				}
+			}
+		} else {
+			// we only added to the store
+			addToRocksDB(toAddBytes);
+			allElementsInCache = false;
+		}
+		return false;
+	}
+
+	@Override
+	public boolean remove(@Nonnull E toRemove) {
+
+		checkRefillCacheFromStore();
+
+		final byte[] oldHead = orderedCache.peekFirst();
+
+		if (oldHead == null) {
+			return false;
+		}
+
+		final byte[] toRemoveBytes = serializeElement(toRemove);
+
+		// write-through sync
+		removeFromRocksDB(toRemoveBytes);
+		orderedCache.remove(toRemoveBytes);
+
+		if (orderedCache.isEmpty()) {
+			seekHint = toRemoveBytes;
+			peekCache = null;
+			return true;
+		}
+
+		if (oldHead != orderedCache.peekFirst()) {
+			peekCache = null;
+			return true;
+		}
+
+		return false;
+	}
+
+	@Override
+	public void addAll(@Nullable Collection<? extends E> toAdd) {
+
+		if (toAdd == null) {
+			return;
+		}
+
+		for (E element : toAdd) {
+			add(element);
+		}
+	}
+
+	@Override
+	public boolean isEmpty() {
+		checkRefillCacheFromStore();
+		return orderedCache.isEmpty();
+	}
+
+	@Nonnull
+	@Override
+	public CloseableIterator<E> iterator() {
+		return new DeserializingIteratorWrapper(orderedBytesIterator());
+	}
+
+	/**
+	 * This implementation comes at a relatively high cost per invocation. It should not be called repeatedly when it is
+	 * clear that the value did not change. Currently this is only truly used to realize certain higher-level tests.
+	 */
+	@Override
+	public int size() {
+
+		if (allElementsInCache) {
+			return orderedCache.size();
+		} else {
+			int count = 0;
+			try (final RocksBytesIterator iterator = orderedBytesIterator()) {
+				while (iterator.hasNext()) {
+					iterator.next();
+					++count;
+				}
+			}
+			return count;
+		}
+	}
+
+	@Override
+	public int getInternalIndex() {
+		return internalIndex;
+	}
+
+	@Override
+	public void setInternalIndex(int newIndex) {
+		this.internalIndex = newIndex;
+	}
+
+	@Nonnull
+	private RocksBytesIterator orderedBytesIterator() {
+		flushWriteBatch();
+		return new RocksBytesIterator(
+			new RocksIteratorWrapper(
+				db.newIterator(columnFamilyHandle)));
+	}
+
+	/**
+	 * Ensures that recent writes are flushed and reflect in the RocksDB instance.
+	 */
+	private void flushWriteBatch() {
+		try {
+			batchWrapper.flush();
+		} catch (RocksDBException e) {
+			throw new FlinkRuntimeException(e);
+		}
+	}
+
+	private void addToRocksDB(@Nonnull byte[] toAddBytes) {
+		try {
+			batchWrapper.put(columnFamilyHandle, toAddBytes, DUMMY_BYTES);
+		} catch (RocksDBException e) {
+			throw new FlinkRuntimeException(e);
+		}
+	}
+
+	private void removeFromRocksDB(@Nonnull byte[] toRemoveBytes) {
+		try {
+			batchWrapper.remove(columnFamilyHandle, toRemoveBytes);
+		} catch (RocksDBException e) {
+			throw new FlinkRuntimeException(e);
+		}
+	}
+
+	private void checkRefillCacheFromStore() {
+		if (!allElementsInCache && orderedCache.isEmpty()) {
+			try (final RocksBytesIterator iterator = orderedBytesIterator()) {
+				orderedCache.bulkLoadFromOrderedIterator(iterator);
+				allElementsInCache = !iterator.hasNext();
+			} catch (Exception e) {
+				throw new FlinkRuntimeException("Exception while refilling store from iterator.", e);
+			}
+		}
+	}
+
+	private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
+		for (int i = 0; i < prefixBytes.length; ++i) {
+			if (bytes[i] != prefixBytes[i]) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Nonnull
+	private byte[] createKeyGroupBytes(int keyGroupId, int numPrefixBytes) {
+
+		outputView.reset();
+
+		try {
+			RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView);
+		} catch (IOException e) {
+			throw new FlinkRuntimeException("Could not write key-group bytes.", e);
+		}
+
+		return outputView.toByteArray();
+	}
+
+	@Nonnull
+	private byte[] serializeElement(@Nonnull E element) {
+		try {
+			outputView.reset();
+			outputView.write(groupPrefixBytes);
+			byteOrderProducingSerializer.serialize(element, outputView);
+			return outputView.toByteArray();
+		} catch (IOException e) {
+			throw new FlinkRuntimeException("Error while serializing the element.", e);
+		}
+	}
+
+	@Nonnull
+	private E deserializeElement(@Nonnull byte[] bytes) {
+		try {
+			inputView.setData(bytes, groupPrefixBytes.length, bytes.length);
+			return byteOrderProducingSerializer.deserialize(inputView);
+		} catch (IOException e) {
+			throw new FlinkRuntimeException("Error while deserializing the element.", e);
+		}
+	}
+
+	/**
+	 * Wraps an iterator over byte-arrays with deserialization logic, so that it iterates over deserialized objects.
+	 */
+	private class DeserializingIteratorWrapper implements CloseableIterator<E> {
+
+		/** The iterator over byte-arrays with the serialized objects. */
+		@Nonnull
+		private final CloseableIterator<byte[]> bytesIterator;
+
+		private DeserializingIteratorWrapper(@Nonnull CloseableIterator<byte[]> bytesIterator) {
+			this.bytesIterator = bytesIterator;
+		}
+
+		@Override
+		public void close() throws Exception {
+			bytesIterator.close();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return bytesIterator.hasNext();
+		}
+
+		@Override
+		public E next() {
+			return deserializeElement(bytesIterator.next());
+		}
+	}
+
+	/**
+	 * Adapter between RocksDB iterator and Java iterator. This is also closeable to release the native resources after
+	 * use.
+	 */
+	private class RocksBytesIterator implements CloseableIterator<byte[]> {
+
+		/** The RocksDb iterator to which we forward ops. */
+		@Nonnull
+		private final RocksIteratorWrapper iterator;
+
+		/** Cache for the current element of the iteration. */
+		@Nullable
+		private byte[] currentElement;
+
+		private RocksBytesIterator(@Nonnull RocksIteratorWrapper iterator) {
+			this.iterator = iterator;
+			try {
+				// We use our knowledge about the lower bound to issue a seek that is as close to the first element in
+				// the key-group as possible, i.e. we generate the next possible key after seekHint by appending one
+				// zero-byte.
+				iterator.seek(Arrays.copyOf(seekHint, seekHint.length + 1));
+				currentElement = nextElementIfAvailable();
+			} catch (Exception ex) {
+				// ensure resource cleanup also in the face of (runtime) exceptions in the constructor.
+				iterator.close();
+				throw new FlinkRuntimeException("Could not initialize ordered iterator.", ex);
+			}
+		}
+
+		@Override
+		public void close() {
+			iterator.close();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return currentElement != null;
+		}
+
+		@Override
+		public byte[] next() {
+			final byte[] returnElement = this.currentElement;
+			if (returnElement == null) {
+				throw new NoSuchElementException("Iterator has no more elements!");
+			}
+			iterator.next();
+			currentElement = nextElementIfAvailable();
+			return returnElement;
+		}
+
+		private byte[] nextElementIfAvailable() {
+			final byte[] elementBytes;
+			return iterator.isValid()
+				&& isPrefixWith((elementBytes = iterator.key()), groupPrefixBytes) ? elementBytes : null;
+		}
+	}
+
+	/**
+	 * Cache that is organized as an ordered set for byte-arrays. The byte-arrays are sorted in lexicographic order
+	 * of their content. Caches typically have a bounded size.
+	 */
+	public interface OrderedByteArraySetCache {
+
+		/** Comparator for byte arrays. */
+		Comparator<byte[]> LEXICOGRAPHIC_BYTE_COMPARATOR = UnsignedBytes.lexicographicalComparator();
+
+		/**
+		 * Returns the number of contained elements.
+		 */
+		int size();
+
+		/**
+		 * Returns the maximum number of elements that can be stored in the cache.
+		 */
+		int maxSize();
+
+		/**
+		 * Returns <code>size() == 0</code>.
+		 */
+		boolean isEmpty();
+
+		/**
+		 * Returns <code>size() == maxSize()</code>.
+		 */
+		boolean isFull();
+
+		/**
+		 * Adds the given element, if it was not already contained. Returns <code>true</code> iff the cache was modified.
+		 */
+		boolean add(@Nonnull byte[] toAdd);
+
+		/**
+		 * Removes the given element, if it is contained. Returns <code>true</code> iff the cache was modified.
+		 */
+		boolean remove(@Nonnull byte[] toRemove);
+
+		/**
+		 * Returns the first element or <code>null</code> if empty.
+		 */
+		@Nullable
+		byte[] peekFirst();
+
+		/**
+		 * Returns the last element or <code>null</code> if empty.
+		 */
+		@Nullable
+		byte[] peekLast();
+
+		/**
+		 * Returns and removes the first element or returns <code>null</code> if empty.
+		 */
+		@Nullable
+		byte[] pollFirst();
+
+		/**
+		 * Returns and removes the last element or returns <code>null</code> if empty.
+		 */
+		@Nullable
+		byte[] pollLast();
+
+		/**
+		 * Clears the cache and adds up to <code>maxSize()</code> elements from the iterator to the cache.
+		 * Iterator must be ordered in the same order as this cache.
+		 *
+		 * @param orderedIterator iterator with elements in-order.
+		 */
+		void bulkLoadFromOrderedIterator(@Nonnull Iterator<byte[]> orderedIterator);
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 72aa4fbaeb2..4af5a2775d8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -39,6 +39,8 @@
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
@@ -84,13 +86,10 @@
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.TieBreakingPriorityComparator;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
-import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
-import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.ExceptionUtils;
@@ -262,6 +261,7 @@
 	/** Factory for priority queue state. */
 	private final PriorityQueueSetFactory priorityQueueFactory;
 
+	/** Shared wrapper for batch writes to the RocksDB instance. */
 	private RocksDBWriteBatchWrapper writeBatchWrapper;
 
 	public RocksDBKeyedStateBackend(
@@ -458,6 +458,11 @@ public int getKeyGroupPrefixBytes() {
 		return keyGroupPrefixBytes;
 	}
 
+	@VisibleForTesting
+	PriorityQueueSetFactory getPriorityQueueFactory() {
+		return priorityQueueFactory;
+	}
+
 	public WriteOptions getWriteOptions() {
 		return writeOptions;
 	}
@@ -481,6 +486,9 @@ public WriteOptions getWriteOptions() {
 		final CheckpointStreamFactory streamFactory,
 		CheckpointOptions checkpointOptions) throws Exception {
 
+		// flush everything into db before taking a snapshot
+		writeBatchWrapper.flush();
+
 		return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
 	}
 
@@ -1775,9 +1783,6 @@ public void close() {
 
 			final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
 
-			// flush everything into db before taking a snapshot
-			writeBatchWrapper.flush();
-
 			final RocksDBFullSnapshotOperation<K> snapshotOperation =
 				new RocksDBFullSnapshotOperation<>(
 					RocksDBKeyedStateBackend.this,
@@ -2643,79 +2648,54 @@ public static RocksIteratorWrapper getRocksIterator(
 	class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 
 		/** Default cache size per key-group. */
-		private static final int DEFAULT_CACHES_SIZE = 1024;
+		private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
 
 		/** A shared buffer to serialize elements for the priority queue. */
 		@Nonnull
-		private final ByteArrayOutputStreamWithPos elementSerializationOutStream;
+		private final ByteArrayDataOutputView sharedElementOutView;
 
-		/** A shared adapter wrapper around elementSerializationOutStream to become a {@link DataOutputView}. */
+		/** A shared buffer to de-serialize elements for the priority queue. */
 		@Nonnull
-		private final DataOutputViewStreamWrapper elementSerializationOutView;
+		private final ByteArrayDataInputView sharedElementInView;
 
 		RocksDBPriorityQueueSetFactory() {
-			this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
-			this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream);
+			this.sharedElementOutView = new ByteArrayDataOutputView();
+			this.sharedElementInView = new ByteArrayDataInputView();
 		}
 
 		@Nonnull
 		@Override
 		public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T>
-		create(
-			@Nonnull String stateName,
-			@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
-
-			final PriorityComparator<T> priorityComparator =
-				PriorityComparator.forPriorityComparableObjects();
+		create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
 
-			Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> entry =
-				kvStateInformation.get(stateName);
-
-			if (entry == null) {
-				RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
-					new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer);
-
-				final ColumnFamilyHandle columnFamilyHandle = createColumnFamily(stateName);
-
-				entry = new Tuple2<>(columnFamilyHandle, metaInfo);
-				kvStateInformation.put(stateName, entry);
-			}
+			final Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> entry =
+				tryRegisterPriorityQueueMetaInfo(stateName, byteOrderedElementSerializer);
 
 			final ColumnFamilyHandle columnFamilyHandle = entry.f0;
 
-			@Nonnull
-			TieBreakingPriorityComparator<T> tieBreakingComparator =
-				new TieBreakingPriorityComparator<>(
-					priorityComparator,
-					byteOrderedElementSerializer,
-					elementSerializationOutStream,
-					elementSerializationOutView);
-
 			return new KeyGroupPartitionedPriorityQueue<>(
 				KeyExtractorFunction.forKeyedObjects(),
-				priorityComparator,
-				new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, CachingInternalPriorityQueueSet<T>>() {
+				PriorityComparator.forPriorityComparableObjects(),
+				new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, RocksDBCachingPriorityQueueSet<T>>() {
 					@Nonnull
 					@Override
-					public CachingInternalPriorityQueueSet<T> create(
+					public RocksDBCachingPriorityQueueSet<T> create(
 						int keyGroupId,
 						int numKeyGroups,
+						@Nonnull KeyExtractorFunction<T> keyExtractor,
 						@Nonnull PriorityComparator<T> elementPriorityComparator) {
-
-						CachingInternalPriorityQueueSet.OrderedSetCache<T> cache =
-							new TreeOrderedSetCache<>(tieBreakingComparator, DEFAULT_CACHES_SIZE);
-						CachingInternalPriorityQueueSet.OrderedSetStore<T> store =
-							new RocksDBOrderedSetStore<>(
-								keyGroupId,
-								keyGroupPrefixBytes,
-								db,
-								columnFamilyHandle,
-								byteOrderedElementSerializer,
-								elementSerializationOutStream,
-								elementSerializationOutView,
-								writeBatchWrapper);
-
-						return new CachingInternalPriorityQueueSet<>(cache, store);
+						TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(DEFAULT_CACHES_SIZE);
+						return new RocksDBCachingPriorityQueueSet<>(
+							keyGroupId,
+							keyGroupPrefixBytes,
+							db,
+							columnFamilyHandle,
+							byteOrderedElementSerializer,
+							sharedElementOutView,
+							sharedElementInView,
+							writeBatchWrapper,
+							orderedSetCache
+						);
 					}
 				},
 				keyGroupRange,
@@ -2723,6 +2703,27 @@ public static RocksIteratorWrapper getRocksIterator(
 		}
 	}
 
+	@Nonnull
+	private <T> Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> tryRegisterPriorityQueueMetaInfo(
+		@Nonnull String stateName,
+		@Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+
+		Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> entry =
+			kvStateInformation.get(stateName);
+
+		if (entry == null) {
+			RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
+				new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer);
+
+			final ColumnFamilyHandle columnFamilyHandle = createColumnFamily(stateName);
+
+			entry = new Tuple2<>(columnFamilyHandle, metaInfo);
+			kvStateInformation.put(stateName, entry);
+		}
+
+		return entry;
+	}
+
 	@Override
 	public boolean requiresLegacySynchronousTimerSnapshots() {
 		return priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 18f9ec95244..c85a7b2077c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -39,9 +39,10 @@
 	/**
 	 * Choice of timer service implementation.
 	 */
-	public static final ConfigOption<String> TIMER_SERVICE_IMPL = ConfigOptions
-		.key("state.backend.rocksdb.timer-service.impl")
+	public static final ConfigOption<String> TIMER_SERVICE_FACTORY = ConfigOptions
+		.key("state.backend.rocksdb.timer-service.factory")
 		.defaultValue(HEAP.name())
-		.withDescription(String.format("This determines the timer service implementation. Options are either %s " +
-			"(heap-based, default) or %s for an implementation based on RocksDB.", HEAP.name(), ROCKSDB.name()));
+		.withDescription(String.format("This determines the factory for timer service state implementation. Options " +
+			"are either %s (heap-based, default) or %s for an implementation based on RocksDB .",
+			HEAP.name(), ROCKSDB.name()));
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
deleted file mode 100644
index 4068c508955..00000000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
-import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.NoSuchElementException;
-
-/**
- * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
- * based on RocksDB.
- *
- * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
- * produced by the provided serializer for the elements!
- *
- * @param <T> the type of stored elements.
- */
-public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
-
-	/** Serialized empty value to insert into RocksDB. */
-	private static final byte[] DUMMY_BYTES = new byte[] {0};
-
-	/** The RocksDB instance that serves as store. */
-	@Nonnull
-	private final RocksDB db;
-
-	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
-	@Nonnull
-	private final ColumnFamilyHandle columnFamilyHandle;
-
-	/**
-	 * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
-	 * aligned with their logical order.
-	 */
-	@Nonnull
-	private final TypeSerializer<T> byteOrderProducingSerializer;
-
-	/** Wrapper to batch all writes to RocksDB. */
-	@Nonnull
-	private final RocksDBWriteBatchWrapper batchWrapper;
-
-	/** The key-group id in serialized form. */
-	@Nonnull
-	private final byte[] groupPrefixBytes;
-
-	/** Output stream that helps to serialize elements. */
-	@Nonnull
-	private final ByteArrayOutputStreamWithPos outputStream;
-
-	/** Output view that helps to serialize elements, must wrap the output stream. */
-	@Nonnull
-	private final DataOutputViewStreamWrapper outputView;
-
-	public RocksDBOrderedSetStore(
-		@Nonnegative int keyGroupId,
-		@Nonnegative int keyGroupPrefixBytes,
-		@Nonnull RocksDB db,
-		@Nonnull ColumnFamilyHandle columnFamilyHandle,
-		@Nonnull TypeSerializer<T> byteOrderProducingSerializer,
-		@Nonnull ByteArrayOutputStreamWithPos outputStream,
-		@Nonnull DataOutputViewStreamWrapper outputView,
-		@Nonnull RocksDBWriteBatchWrapper batchWrapper) {
-		this.db = db;
-		this.columnFamilyHandle = columnFamilyHandle;
-		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
-		this.outputStream = outputStream;
-		this.outputView = outputView;
-		this.batchWrapper = batchWrapper;
-		this.groupPrefixBytes = createKeyGroupBytes(keyGroupId, keyGroupPrefixBytes);
-	}
-
-	private byte[] createKeyGroupBytes(int keyGroupId, int numPrefixBytes) {
-
-		outputStream.reset();
-
-		try {
-			RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, numPrefixBytes, outputView);
-		} catch (IOException e) {
-			throw new FlinkRuntimeException("Could not write key-group bytes.", e);
-		}
-
-		return outputStream.toByteArray();
-	}
-
-	@Override
-	public void add(@Nonnull T element) {
-		byte[] elementBytes = serializeElement(element);
-		try {
-			batchWrapper.put(columnFamilyHandle, elementBytes, DUMMY_BYTES);
-		} catch (RocksDBException e) {
-			throw new FlinkRuntimeException("Error while getting element from RocksDB.", e);
-		}
-	}
-
-	@Override
-	public void remove(@Nonnull T element) {
-		byte[] elementBytes = serializeElement(element);
-		try {
-			batchWrapper.remove(columnFamilyHandle, elementBytes);
-		} catch (RocksDBException e) {
-			throw new FlinkRuntimeException("Error while removing element from RocksDB.", e);
-		}
-	}
-
-	/**
-	 * This implementation comes at a relatively high cost per invocation. It should not be called repeatedly when it is
-	 * clear that the value did not change. Currently this is only truly used to realize certain higher-level tests.
-	 *
-	 * @see org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore
-	 */
-	@Override
-	public int size() {
-
-		int count = 0;
-		try (final RocksToJavaIteratorAdapter iterator = orderedIterator()) {
-			while (iterator.hasNext()) {
-				iterator.next();
-				++count;
-			}
-		}
-
-		return count;
-	}
-
-	@Nonnull
-	@Override
-	public RocksToJavaIteratorAdapter orderedIterator() {
-
-		flushWriteBatch();
-		return new RocksToJavaIteratorAdapter(
-			new RocksIteratorWrapper(
-				db.newIterator(columnFamilyHandle)));
-	}
-
-	/**
-	 * Ensures that recent writes are flushed and reflect in the RocksDB instance.
-	 */
-	private void flushWriteBatch() {
-		try {
-			batchWrapper.flush();
-		} catch (RocksDBException e) {
-			throw new FlinkRuntimeException(e);
-		}
-	}
-
-	private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
-		for (int i = 0; i < prefixBytes.length; ++i) {
-			if (bytes[i] != prefixBytes[i]) {
-				return false;
-			}
-		}
-		return true;
-	}
-
-	private byte[] serializeElement(T element) {
-		try {
-			outputStream.reset();
-			outputView.write(groupPrefixBytes);
-			byteOrderProducingSerializer.serialize(element, outputView);
-			return outputStream.toByteArray();
-		} catch (IOException e) {
-			throw new FlinkRuntimeException("Error while serializing the element.", e);
-		}
-	}
-
-	private T deserializeElement(byte[] bytes) {
-		try {
-			// TODO introduce a stream in which we can change the internal byte[] to avoid creating instances per call
-			ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(bytes);
-			DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
-			inputView.skipBytes(groupPrefixBytes.length);
-			return byteOrderProducingSerializer.deserialize(inputView);
-		} catch (IOException e) {
-			throw new FlinkRuntimeException("Error while deserializing the element.", e);
-		}
-	}
-
-	/**
-	 * Adapter between RocksDB iterator and Java iterator. This is also closeable to release the native resources after
-	 * use.
-	 */
-	private class RocksToJavaIteratorAdapter implements CloseableIterator<T> {
-
-		/** The RocksDb iterator to which we forward ops. */
-		@Nonnull
-		private final RocksIteratorWrapper iterator;
-
-		/** Cache for the current element of the iteration. */
-		@Nullable
-		private T currentElement;
-
-		private RocksToJavaIteratorAdapter(@Nonnull RocksIteratorWrapper iterator) {
-			this.iterator = iterator;
-			try {
-				// TODO we could check if it is more efficient to make the seek more specific, e.g. with a provided hint
-				// that is lexicographically closer the first expected element in the key-group. I wonder if this could
-				// help to improve the seek if there are many tombstones for elements at the beginning of the key-group
-				// (like for elements that have been removed in previous polling, before they are compacted away).
-				iterator.seek(groupPrefixBytes);
-				deserializeNextElementIfAvailable();
-			} catch (Exception ex) {
-				// ensure resource cleanup also in the face of (runtime) exceptions in the constructor.
-				iterator.close();
-				throw new FlinkRuntimeException("Could not initialize ordered iterator.", ex);
-			}
-		}
-
-		@Override
-		public void close() {
-			iterator.close();
-		}
-
-		@Override
-		public boolean hasNext() {
-			return currentElement != null;
-		}
-
-		@Override
-		public T next() {
-			final T returnElement = this.currentElement;
-			if (returnElement == null) {
-				throw new NoSuchElementException("Iterator has no more elements!");
-			}
-			iterator.next();
-			deserializeNextElementIfAvailable();
-			return returnElement;
-		}
-
-		private void deserializeNextElementIfAvailable() {
-			if (iterator.isValid()) {
-				final byte[] elementBytes = iterator.key();
-				if (isPrefixWith(elementBytes, groupPrefixBytes)) {
-					this.currentElement = deserializeElement(elementBytes);
-				} else {
-					this.currentElement = null;
-				}
-			} else {
-				this.currentElement = null;
-			}
-		}
-	}
-}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 056484939e7..b8bd73c08e1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -60,7 +60,7 @@
 import java.util.Random;
 import java.util.UUID;
 
-import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_IMPL;
+import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -271,7 +271,7 @@ private RocksDBStateBackend(RocksDBStateBackend original, Configuration config)
 		this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(
 			config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
 
-		final String priorityQueueTypeString = config.getString(TIMER_SERVICE_IMPL);
+		final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY);
 
 		this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
 			PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/TreeOrderedSetCache.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/TreeOrderedSetCache.java
new file mode 100644
index 00000000000..c48cc745ae3
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/TreeOrderedSetCache.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.TreeSet;
+
+/**
+ * Implementation of a lexicographically ordered set of byte-arrays, based on a {@link TreeSet}.
+ */
+public class TreeOrderedSetCache implements RocksDBCachingPriorityQueueSet.OrderedByteArraySetCache {
+
+	/** Maximum capacity. */
+	private final int maxSize;
+
+	@Nonnull
+	private final TreeSet<byte[]> treeSet;
+
+	TreeOrderedSetCache(int maxSize) {
+		this.maxSize = maxSize;
+		this.treeSet = new TreeSet<>(LEXICOGRAPHIC_BYTE_COMPARATOR);
+	}
+
+	@Override
+	public int size() {
+		return treeSet.size();
+	}
+
+	@Override
+	public int maxSize() {
+		return maxSize;
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return treeSet.isEmpty();
+	}
+
+	@Override
+	public boolean isFull() {
+		return treeSet.size() >= maxSize;
+	}
+
+	@Override
+	public boolean add(@Nonnull byte[] toAdd) {
+		return treeSet.add(toAdd);
+	}
+
+	@Override
+	public boolean remove(@Nonnull byte[] toRemove) {
+		return treeSet.remove(toRemove);
+	}
+
+	@Nullable
+	@Override
+	public byte[] peekFirst() {
+		return !isEmpty() ? treeSet.first() : null;
+	}
+
+	@Nullable
+	@Override
+	public byte[] peekLast() {
+		return !isEmpty() ? treeSet.last() : null;
+	}
+
+	@Nullable
+	@Override
+	public byte[] pollFirst() {
+		return !isEmpty() ? treeSet.pollFirst() : null;
+	}
+
+	@Nullable
+	@Override
+	public byte[] pollLast() {
+		return !isEmpty() ? treeSet.pollLast() : null;
+	}
+
+	@Override
+	public void bulkLoadFromOrderedIterator(@Nonnull Iterator<byte[]> orderedIterator) {
+		treeSet.clear();
+		for (int i = maxSize; --i >= 0 && orderedIterator.hasNext(); ) {
+			treeSet.add(orderedIterator.next());
+		}
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
deleted file mode 100644
index 5f26835282c..00000000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
-import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSetTestBase;
-import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
-
-import org.junit.Rule;
-
-/**
- * Test for {@link CachingInternalPriorityQueueSet} with a
- * {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} based on RocksDB.
- */
-public class CachingInternalPriorityQueueSetWithRocksDBStoreTest extends CachingInternalPriorityQueueSetTestBase {
-
-	@Rule
-	public final RocksDBResource rocksDBResource = new RocksDBResource();
-
-	@Override
-	protected CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> createOrderedSetStore() {
-		return createRocksDBStore(0, 1, rocksDBResource);
-	}
-
-	@Override
-	protected CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> createOrderedSetCache() {
-		return new TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 32);
-	}
-
-	public static CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> createRocksDBStore(
-		int keyGroupId,
-		int totalKeyGroups,
-		RocksDBResource rocksDBResource) {
-		ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(16);
-		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
-		int prefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(totalKeyGroups);
-		return new RocksDBOrderedSetStore<>(
-			keyGroupId,
-			prefixBytes,
-			rocksDBResource.getRocksDB(),
-			rocksDBResource.getDefaultColumnFamily(),
-			TestElementSerializer.INSTANCE,
-			outputStream,
-			outputView,
-			rocksDBResource.getBatchWrapper());
-	}
-}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
index 42782a40e87..ad8b74c975f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -18,36 +18,54 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
 import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
-import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueueTest;
-import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
 
 import org.junit.Rule;
 
 /**
- * Test of {@link KeyGroupPartitionedPriorityQueue} powered by a {@link RocksDBOrderedSetStore}.
+ * Test of {@link KeyGroupPartitionedPriorityQueue} powered by a {@link RocksDBCachingPriorityQueueSet}.
  */
-public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends KeyGroupPartitionedPriorityQueueTest {
+public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends InternalPriorityQueueTestBase {
 
 	@Rule
 	public final RocksDBResource rocksDBResource = new RocksDBResource();
 
 	@Override
-	protected KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<
-			TestElement, CachingInternalPriorityQueueSet<TestElement>> newFactory(
-		int initialCapacity) {
-
-		return (keyGroupId, numKeyGroups, elementComparator) -> {
-			CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> cache =
-				new TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 32);
-			CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> store =
-				RocksDBOrderedSetStoreTest.createRocksDBOrderedStore(
-					rocksDBResource,
-					TestElementSerializer.INSTANCE,
-					keyGroupId,
-					numKeyGroups);
-			return new CachingInternalPriorityQueueSet<>(cache, store);
+	protected InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
+		return new KeyGroupPartitionedPriorityQueue<>(
+			KEY_EXTRACTOR_FUNCTION,
+			TEST_ELEMENT_PRIORITY_COMPARATOR,
+			newFactory(),
+			KEY_GROUP_RANGE, KEY_GROUP_RANGE.getNumberOfKeyGroups());
+	}
+
+	@Override
+	protected boolean testSetSemanticsAgainstDuplicateElements() {
+		return true;
+	}
+
+	private KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<
+		TestElement, RocksDBCachingPriorityQueueSet<TestElement>> newFactory() {
+
+		return (keyGroupId, numKeyGroups, keyExtractorFunction, elementComparator) -> {
+			ByteArrayDataOutputView outputStreamWithPos = new ByteArrayDataOutputView();
+			ByteArrayDataInputView inputStreamWithPos = new ByteArrayDataInputView();
+			int keyGroupPrefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(numKeyGroups);
+			TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(32);
+			return new RocksDBCachingPriorityQueueSet<>(
+				keyGroupId,
+				keyGroupPrefixBytes,
+				rocksDBResource.getRocksDB(),
+				rocksDBResource.getDefaultColumnFamily(),
+				TestElementSerializer.INSTANCE,
+				outputStreamWithPos,
+				inputStreamWithPos,
+				rocksDBResource.getBatchWrapper(),
+				orderedSetCache);
 		};
 	}
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index a8e5a36fbff..e34463888f1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -256,7 +256,7 @@ public void testCancelFullyAsyncCheckpoints() throws Exception {
 
 		File dbDir = temporaryFolder.newFolder();
 
-		final RocksDBStateBackend.PriorityQueueStateType timerServicePriorityQueueType = RocksDBStateBackend.PriorityQueueStateType.valueOf(RocksDBOptions.TIMER_SERVICE_IMPL.defaultValue());
+		final RocksDBStateBackend.PriorityQueueStateType timerServicePriorityQueueType = RocksDBStateBackend.PriorityQueueStateType.valueOf(RocksDBOptions.TIMER_SERVICE_FACTORY.defaultValue());
 
 		final int skipStreams;
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
deleted file mode 100644
index 0b1d07bd082..00000000000
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
-import org.apache.flink.util.CloseableIterator;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-
-import java.util.NoSuchElementException;
-
-/**
- * Test for RocksDBOrderedStore.
- */
-public class RocksDBOrderedSetStoreTest {
-
-	@Rule
-	public final RocksDBResource rocksDBResource = new RocksDBResource();
-
-	@Test
-	public void testOrderedIterator() throws Exception {
-		CachingInternalPriorityQueueSet.OrderedSetStore<Integer> store = createRocksDBOrderedStore();
-
-		//test empty iterator
-		try (final CloseableIterator<Integer> emptyIterator = store.orderedIterator()) {
-			Assert.assertFalse(emptyIterator.hasNext());
-			try {
-				emptyIterator.next();
-				Assert.fail();
-			} catch (NoSuchElementException expected) {
-			}
-		}
-
-		store.add(43);
-		store.add(42);
-		store.add(41);
-		store.add(41);
-		store.remove(42);
-
-		// test in-order iteration
-		try (final CloseableIterator<Integer> iterator = store.orderedIterator()) {
-			Assert.assertTrue(iterator.hasNext());
-			Assert.assertEquals(Integer.valueOf(41), iterator.next());
-			Assert.assertTrue(iterator.hasNext());
-			Assert.assertEquals(Integer.valueOf(43), iterator.next());
-			Assert.assertFalse(iterator.hasNext());
-			try {
-				iterator.next();
-				Assert.fail();
-			} catch (NoSuchElementException expected) {
-			}
-		}
-	}
-
-	@Test
-	public void testAddRemoveSize() {
-
-		CachingInternalPriorityQueueSet.OrderedSetStore<Integer> store = createRocksDBOrderedStore();
-
-		// test empty size
-		Assert.assertEquals(0, store.size());
-
-		// test add uniques
-		store.remove(41);
-		Assert.assertEquals(0, store.size());
-		store.add(41);
-		Assert.assertEquals(1, store.size());
-		store.add(42);
-		Assert.assertEquals(2, store.size());
-		store.add(43);
-		Assert.assertEquals(3, store.size());
-		store.add(44);
-		Assert.assertEquals(4, store.size());
-		store.add(45);
-		Assert.assertEquals(5, store.size());
-
-		// test remove
-		store.remove(41);
-		Assert.assertEquals(4, store.size());
-		store.remove(41);
-		Assert.assertEquals(4, store.size());
-
-		// test set semantics by attempt to insert duplicate
-		store.add(42);
-		Assert.assertEquals(4, store.size());
-	}
-
-	public static <E> RocksDBOrderedSetStore<E> createRocksDBOrderedStore(
-		@Nonnull RocksDBResource rocksDBResource,
-		@Nonnull TypeSerializer<E> byteOrderSerializer,
-		@Nonnegative int keyGroupId,
-		@Nonnegative int totalKeyGroups) {
-
-		ByteArrayOutputStreamWithPos outputStreamWithPos = new ByteArrayOutputStreamWithPos(32);
-		DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStreamWithPos);
-		int keyGroupPrefixBytes = RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(totalKeyGroups);
-		return new RocksDBOrderedSetStore<>(
-			keyGroupId,
-			keyGroupPrefixBytes,
-			rocksDBResource.getRocksDB(),
-			rocksDBResource.getDefaultColumnFamily(),
-			byteOrderSerializer,
-			outputStreamWithPos,
-			outputView,
-			rocksDBResource.getBatchWrapper());
-	}
-
-	protected RocksDBOrderedSetStore<Integer> createRocksDBOrderedStore() {
-		return createRocksDBOrderedStore(rocksDBResource, IntSerializer.INSTANCE, 0, 1);
-	}
-}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index d6c3144d94b..b7d05b3455e 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -35,11 +35,13 @@
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.IOUtils;
 
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -48,6 +50,7 @@
 import org.rocksdb.DBOptions;
 
 import java.io.File;
+import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -125,6 +128,45 @@ public void testSetDbPath() throws Exception {
 		}
 	}
 
+	@Test
+	public void testConfigureTimerService() throws Exception {
+
+		final Environment env = getMockEnvironment(tempFolder.newFolder());
+
+		// Fix the option key string
+		Assert.assertEquals("state.backend.rocksdb.timer-service.factory", RocksDBOptions.TIMER_SERVICE_FACTORY.key());
+
+		// Fix the option value string and ensure all are covered
+		Assert.assertEquals(2, RocksDBStateBackend.PriorityQueueStateType.values().length);
+		Assert.assertEquals("ROCKSDB", RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
+		Assert.assertEquals("HEAP", RocksDBStateBackend.PriorityQueueStateType.HEAP.toString());
+
+		// Fix the default
+		Assert.assertEquals(
+			RocksDBStateBackend.PriorityQueueStateType.HEAP.toString(),
+			RocksDBOptions.TIMER_SERVICE_FACTORY.defaultValue());
+
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
+
+		RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+		keyedBackend.restore(Collections.emptyList());
+		Assert.assertEquals(HeapPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass());
+		keyedBackend.dispose();
+
+		Configuration conf = new Configuration();
+		conf.setString(
+			RocksDBOptions.TIMER_SERVICE_FACTORY,
+			RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
+
+		rocksDbBackend = rocksDbBackend.configure(conf);
+		keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
+		keyedBackend.restore(Collections.emptyList());
+		Assert.assertEquals(
+			RocksDBKeyedStateBackend.RocksDBPriorityQueueSetFactory.class,
+			keyedBackend.getPriorityQueueFactory().getClass());
+		keyedBackend.dispose();
+	}
+
 	@Test
 	public void testStoragePathWithFilePrefix() throws Exception {
 		final File folder = tempFolder.newFolder();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index 29b89c28113..b54a1a9adba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -37,7 +37,7 @@
 /**
  * An entity keeping all the time-related services available to all operators extending the
  * {@link AbstractStreamOperator}. Right now, this is only a
- * {@link HeapInternalTimerService timer services}.
+ * {@link InternalTimerServiceImpl timer services}.
  *
  * <b>NOTE:</b> These services are only available to keyed operators.
  *
@@ -58,7 +58,7 @@
 	private final PriorityQueueSetFactory priorityQueueSetFactory;
 	private final ProcessingTimeService processingTimeService;
 
-	private final Map<String, HeapInternalTimerService<K, ?>> timerServices;
+	private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
 
 	private final boolean useLegacySynchronousSnapshots;
 
@@ -86,7 +86,7 @@
 		TimerSerializer<K, N> timerSerializer,
 		Triggerable<K, N> triggerable) {
 
-		HeapInternalTimerService<K, N> timerService = registerOrGetTimerService(name, timerSerializer);
+		InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);
 
 		timerService.startTimerService(
 			timerSerializer.getKeySerializer(),
@@ -97,11 +97,11 @@
 	}
 
 	@SuppressWarnings("unchecked")
-	<N> HeapInternalTimerService<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
-		HeapInternalTimerService<K, N> timerService = (HeapInternalTimerService<K, N>) timerServices.get(name);
+	<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
+		InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
 		if (timerService == null) {
 
-			timerService = new HeapInternalTimerService<>(
+			timerService = new InternalTimerServiceImpl<>(
 				localKeyGroupRange,
 				keyContext,
 				processingTimeService,
@@ -113,7 +113,7 @@
 		return timerService;
 	}
 
-	Map<String, HeapInternalTimerService<K, ?>> getRegisteredTimerServices() {
+	Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {
 		return Collections.unmodifiableMap(timerServices);
 	}
 
@@ -126,7 +126,7 @@
 	}
 
 	public void advanceWatermark(Watermark watermark) throws Exception {
-		for (HeapInternalTimerService<?, ?> service : timerServices.values()) {
+		for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
 			service.advanceWatermark(watermark.getTimestamp());
 		}
 	}
@@ -164,7 +164,7 @@ public boolean isUseLegacySynchronousSnapshots() {
 	@VisibleForTesting
 	public int numProcessingTimeTimers() {
 		int count = 0;
-		for (HeapInternalTimerService<?, ?> timerService : timerServices.values()) {
+		for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
 			count += timerService.numProcessingTimeTimers();
 		}
 		return count;
@@ -173,7 +173,7 @@ public int numProcessingTimeTimers() {
 	@VisibleForTesting
 	public int numEventTimeTimers() {
 		int count = 0;
-		for (HeapInternalTimerService<?, ?> timerService : timerServices.values()) {
+		for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
 			count += timerService.numEventTimeTimers();
 		}
 		return count;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
similarity index 92%
rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index 6c1b1886b94..3bded50378b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -43,7 +43,7 @@
 /**
  * {@link InternalTimerService} that stores timers on the Java heap.
  */
-public class HeapInternalTimerService<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
+public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {
 
 	private final ProcessingTimeService processingTimeService;
 
@@ -95,7 +95,7 @@
 	/** The restored timers snapshot, if any. */
 	private InternalTimersSnapshot<K, N> restoredTimersSnapshot;
 
-	HeapInternalTimerService(
+	InternalTimerServiceImpl(
 		KeyGroupRange localKeyGroupRange,
 		KeyContext keyContext,
 		ProcessingTimeService processingTimeService,
@@ -117,7 +117,7 @@
 	}
 
 	/**
-	 * Starts the local {@link HeapInternalTimerService} by:
+	 * Starts the local {@link InternalTimerServiceImpl} by:
 	 * <ol>
 	 *     <li>Setting the {@code keySerialized} and {@code namespaceSerializer} for the timers it will contain.</li>
 	 *     <li>Setting the {@code triggerTarget} which contains the action to be performed when a timer fires.</li>
@@ -227,37 +227,29 @@ public void onProcessingTime(long time) throws Exception {
 		// inside the callback.
 		nextTimer = null;
 
-		processingTimeTimersQueue.bulkPoll(
-			(timer) -> (timer.getTimestamp() <= time),
-			(timer) -> {
-				keyContext.setCurrentKey(timer.getKey());
-				try {
-					triggerTarget.onProcessingTime(timer);
-				} catch (Exception e) {
-					throw new FlinkRuntimeException("Problem in trigger target.", e);
-				}
-			});
+		InternalTimer<K, N> timer;
 
-		if (nextTimer == null) {
-			final TimerHeapInternalTimer<K, N> timer = processingTimeTimersQueue.peek();
-			if (timer != null) {
-				nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
-			}
+		while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
+			processingTimeTimersQueue.poll();
+			keyContext.setCurrentKey(timer.getKey());
+			triggerTarget.onProcessingTime(timer);
+		}
+
+		if (timer != null && nextTimer == null) {
+			nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
 		}
 	}
 
 	public void advanceWatermark(long time) throws Exception {
 		currentWatermark = time;
-		eventTimeTimersQueue.bulkPoll(
-			(timer) -> (timer.getTimestamp() <= time),
-			(timer) -> {
-				keyContext.setCurrentKey(timer.getKey());
-				try {
-					triggerTarget.onEventTime(timer);
-				} catch (Exception e) {
-					throw new FlinkRuntimeException("Problem in trigger target.", e);
-				}
-			});
+
+		InternalTimer<K, N> timer;
+
+		while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
+			eventTimeTimersQueue.poll();
+			keyContext.setCurrentKey(timer.getKey());
+			triggerTarget.onEventTime(timer);
+		}
 	}
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java
index ce490b5dfc0..dea17f98665 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java
@@ -77,13 +77,13 @@ public int getVersion() {
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		super.write(out);
-		final Map<String, HeapInternalTimerService<K, ?>> registeredTimerServices =
+		final Map<String, InternalTimerServiceImpl<K, ?>> registeredTimerServices =
 			timerServicesManager.getRegisteredTimerServices();
 
 		out.writeInt(registeredTimerServices.size());
-		for (Map.Entry<String, HeapInternalTimerService<K, ?>> entry : registeredTimerServices.entrySet()) {
+		for (Map.Entry<String, InternalTimerServiceImpl<K, ?>> entry : registeredTimerServices.entrySet()) {
 			String serviceName = entry.getKey();
-			HeapInternalTimerService<K, ?> timerService = entry.getValue();
+			InternalTimerServiceImpl<K, ?> timerService = entry.getValue();
 
 			out.writeUTF(serviceName);
 			InternalTimersSnapshotReaderWriters
@@ -104,7 +104,7 @@ protected void read(DataInputView in, boolean wasVersioned) throws IOException {
 				.getReaderForVersion(readerVersion, userCodeClassLoader)
 				.readTimersSnapshot(in);
 
-			HeapInternalTimerService<K, ?> timerService = registerOrGetTimerService(
+			InternalTimerServiceImpl<K, ?> timerService = registerOrGetTimerService(
 				serviceName,
 				restoredTimersSnapshot);
 
@@ -113,7 +113,7 @@ protected void read(DataInputView in, boolean wasVersioned) throws IOException {
 	}
 
 	@SuppressWarnings("unchecked")
-	private <N> HeapInternalTimerService<K, N> registerOrGetTimerService(
+	private <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(
 		String serviceName, InternalTimersSnapshot<?, ?> restoredTimersSnapshot) {
 		final TypeSerializer<K> keySerializer = (TypeSerializer<K>) restoredTimersSnapshot.getKeySerializer();
 		final TypeSerializer<N> namespaceSerializer = (TypeSerializer<N>) restoredTimersSnapshot.getNamespaceSerializer();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
similarity index 95%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
index 957a53516ba..f2da6da3b05 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
@@ -61,10 +61,10 @@
 import static org.mockito.Mockito.verify;
 
 /**
- * Tests for {@link HeapInternalTimerService}.
+ * Tests for {@link InternalTimerServiceImpl}.
  */
 @RunWith(Parameterized.class)
-public class HeapInternalTimerServiceTest {
+public class InternalTimerServiceImplTest {
 
 	private final int maxParallelism;
 	private final KeyGroupRange testKeyGroupRange;
@@ -73,7 +73,7 @@
 		return any();
 	}
 
-	public HeapInternalTimerServiceTest(int startKeyGroup, int endKeyGroup, int maxParallelism) {
+	public InternalTimerServiceImplTest(int startKeyGroup, int endKeyGroup, int maxParallelism) {
 		this.testKeyGroupRange = new KeyGroupRange(startKeyGroup, endKeyGroup);
 		this.maxParallelism = maxParallelism;
 	}
@@ -89,7 +89,7 @@ public void testKeyGroupStartIndexSetting() {
 
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 
-		HeapInternalTimerService<Integer, String> service = createInternalTimerService(
+		InternalTimerServiceImpl<Integer, String> service = createInternalTimerService(
 			testKeyGroupList,
 			keyContext,
 			processingTimeService,
@@ -117,7 +117,7 @@ public void testTimerAssignmentToKeyGroups() {
 		final PriorityQueueSetFactory priorityQueueSetFactory =
 			createQueueFactory(keyGroupRange, totalNoOfKeyGroups);
 
-		HeapInternalTimerService<Integer, String> timerService = createInternalTimerService(
+		InternalTimerServiceImpl<Integer, String> timerService = createInternalTimerService(
 			keyGroupRange,
 			keyContext,
 			new TestProcessingTimeService(),
@@ -182,7 +182,7 @@ public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception {
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		PriorityQueueSetFactory priorityQueueSetFactory =
 			new HeapPriorityQueueSetFactory(testKeyGroupRange, maxParallelism, 128);
-		HeapInternalTimerService<Integer, String> timerService =
+		InternalTimerServiceImpl<Integer, String> timerService =
 				createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, priorityQueueSetFactory);
 
 		int key = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
@@ -244,7 +244,7 @@ public void testRegisterEarlierProcessingTimerMovesPhysicalProcessingTimer() thr
 
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 
-		HeapInternalTimerService<Integer, String> timerService =
+		InternalTimerServiceImpl<Integer, String> timerService =
 				createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
 
 		int key = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
@@ -277,7 +277,7 @@ public void testRegisteringProcessingTimeTimerInOnProcessingTimeDoesNotLeakPhysi
 
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 
-		final HeapInternalTimerService<Integer, String> timerService =
+		final InternalTimerServiceImpl<Integer, String> timerService =
 				createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
 
 		int key = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
@@ -328,7 +328,7 @@ public void testCurrentProcessingTime() throws Exception {
 
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-		HeapInternalTimerService<Integer, String> timerService =
+		InternalTimerServiceImpl<Integer, String> timerService =
 				createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
 
 		processingTimeService.setCurrentTime(17L);
@@ -346,7 +346,7 @@ public void testCurrentEventTime() throws Exception {
 
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-		HeapInternalTimerService<Integer, String> timerService =
+		InternalTimerServiceImpl<Integer, String> timerService =
 				createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
 
 		timerService.advanceWatermark(17);
@@ -366,7 +366,7 @@ public void testSetAndFireEventTimeTimers() throws Exception {
 
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-		HeapInternalTimerService<Integer, String> timerService =
+		InternalTimerServiceImpl<Integer, String> timerService =
 				createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
 
 		// get two different keys
@@ -411,7 +411,7 @@ public void testSetAndFireProcessingTimeTimers() throws Exception {
 
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-		HeapInternalTimerService<Integer, String> timerService =
+		InternalTimerServiceImpl<Integer, String> timerService =
 				createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
 
 		// get two different keys
@@ -458,7 +458,7 @@ public void testDeleteEventTimeTimers() throws Exception {
 
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-		HeapInternalTimerService<Integer, String> timerService =
+		InternalTimerServiceImpl<Integer, String> timerService =
 				createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
 
 		// get two different keys
@@ -515,7 +515,7 @@ public void testDeleteProcessingTimeTimers() throws Exception {
 
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-		HeapInternalTimerService<Integer, String> timerService =
+		InternalTimerServiceImpl<Integer, String> timerService =
 				createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
 
 		// get two different keys
@@ -590,7 +590,7 @@ private void testSnapshotAndRestore(int snapshotVersion) throws Exception {
 
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
-		HeapInternalTimerService<Integer, String> timerService =
+		InternalTimerServiceImpl<Integer, String> timerService =
 			createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
 
 		// get two different keys
@@ -665,7 +665,7 @@ private void testSnapshotAndRebalancingRestore(int snapshotVersion) throws Excep
 		TestKeyContext keyContext = new TestKeyContext();
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		final PriorityQueueSetFactory queueFactory = createQueueFactory();
-		HeapInternalTimerService<Integer, String> timerService =
+		InternalTimerServiceImpl<Integer, String> timerService =
 			createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, queueFactory);
 
 		int midpoint = testKeyGroupRange.getStartKeyGroup() +
@@ -730,7 +730,7 @@ private void testSnapshotAndRebalancingRestore(int snapshotVersion) throws Excep
 		TestProcessingTimeService processingTimeService1 = new TestProcessingTimeService();
 		TestProcessingTimeService processingTimeService2 = new TestProcessingTimeService();
 
-		HeapInternalTimerService<Integer, String> timerService1 = restoreTimerService(
+		InternalTimerServiceImpl<Integer, String> timerService1 = restoreTimerService(
 			snapshot1,
 			snapshotVersion,
 			mockTriggerable1,
@@ -739,7 +739,7 @@ private void testSnapshotAndRebalancingRestore(int snapshotVersion) throws Excep
 			subKeyGroupRange1,
 			queueFactory);
 
-		HeapInternalTimerService<Integer, String> timerService2 = restoreTimerService(
+		InternalTimerServiceImpl<Integer, String> timerService2 = restoreTimerService(
 			snapshot2,
 			snapshotVersion,
 			mockTriggerable2,
@@ -806,13 +806,13 @@ private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism
 		return result;
 	}
 
-	private static HeapInternalTimerService<Integer, String> createAndStartInternalTimerService(
+	private static InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService(
 			Triggerable<Integer, String> triggerable,
 			KeyContext keyContext,
 			ProcessingTimeService processingTimeService,
 			KeyGroupRange keyGroupList,
 			PriorityQueueSetFactory priorityQueueSetFactory) {
-		HeapInternalTimerService<Integer, String> service = createInternalTimerService(
+		InternalTimerServiceImpl<Integer, String> service = createInternalTimerService(
 			keyGroupList,
 			keyContext,
 			processingTimeService,
@@ -824,7 +824,7 @@ private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism
 		return service;
 	}
 
-	private static HeapInternalTimerService<Integer, String> restoreTimerService(
+	private static InternalTimerServiceImpl<Integer, String> restoreTimerService(
 			Map<Integer, byte[]> state,
 			int snapshotVersion,
 			Triggerable<Integer, String> triggerable,
@@ -834,7 +834,7 @@ private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism
 			PriorityQueueSetFactory priorityQueueSetFactory) throws Exception {
 
 		// create an empty service
-		HeapInternalTimerService<Integer, String> service = createInternalTimerService(
+		InternalTimerServiceImpl<Integer, String> service = createInternalTimerService(
 			keyGroupsList,
 			keyContext,
 			processingTimeService,
@@ -848,7 +848,7 @@ private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism
 				try (ByteArrayInputStream inputStream = new ByteArrayInputStream(state.get(keyGroupIndex))) {
 					InternalTimersSnapshot<?, ?> restoredTimersSnapshot =
 						InternalTimersSnapshotReaderWriters
-							.getReaderForVersion(snapshotVersion, HeapInternalTimerServiceTest.class.getClassLoader())
+							.getReaderForVersion(snapshotVersion, InternalTimerServiceImplTest.class.getClassLoader())
 							.readTimersSnapshot(new DataInputViewStreamWrapper(inputStream));
 
 					service.restoreTimersForKeyGroup(restoredTimersSnapshot, keyGroupIndex);
@@ -886,7 +886,7 @@ protected PriorityQueueSetFactory createQueueFactory(KeyGroupRange keyGroupRange
 		});
 	}
 
-	private static <K, N> HeapInternalTimerService<K, N> createInternalTimerService(
+	private static <K, N> InternalTimerServiceImpl<K, N> createInternalTimerService(
 		KeyGroupRange keyGroupsList,
 		KeyContext keyContext,
 		ProcessingTimeService processingTimeService,
@@ -896,7 +896,7 @@ protected PriorityQueueSetFactory createQueueFactory(KeyGroupRange keyGroupRange
 
 		TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer);
 
-		return new HeapInternalTimerService<>(
+		return new InternalTimerServiceImpl<>(
 			keyGroupsList,
 			keyContext,
 			processingTimeService,
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index e9a2e45d260..4fa90206a75 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -29,6 +29,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -167,26 +168,18 @@ private Configuration getConfiguration() throws Exception {
 				break;
 			}
 			case ROCKSDB_FULLY_ASYNC: {
-				String rocksDb = tempFolder.newFolder().getAbsolutePath();
-				String backups = tempFolder.newFolder().getAbsolutePath();
-				RocksDBStateBackend rdb = new RocksDBStateBackend(new FsStateBackend("file://" + backups));
-				rdb.setDbStoragePath(rocksDb);
-				this.stateBackend = rdb;
+				setupRocksDB(-1, false);
 				break;
 			}
 			case ROCKSDB_INCREMENTAL:
+				// Test RocksDB based timer service as well
+				config.setString(
+					RocksDBOptions.TIMER_SERVICE_FACTORY,
+					RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
+				setupRocksDB(16, true);
+				break;
 			case ROCKSDB_INCREMENTAL_ZK: {
-				String rocksDb = tempFolder.newFolder().getAbsolutePath();
-				String backups = tempFolder.newFolder().getAbsolutePath();
-				// we use the fs backend with small threshold here to test the behaviour with file
-				// references, not self contained byte handles
-				RocksDBStateBackend rdb =
-					new RocksDBStateBackend(
-						new FsStateBackend(
-							new Path("file://" + backups).toUri(), 16),
-						true);
-				rdb.setDbStoragePath(rocksDb);
-				this.stateBackend = rdb;
+				setupRocksDB(16, true);
 				break;
 			}
 			default:
@@ -195,6 +188,20 @@ private Configuration getConfiguration() throws Exception {
 		return config;
 	}
 
+	private void setupRocksDB(int fileSizeThreshold, boolean incrementalCheckpoints) throws IOException {
+		String rocksDb = tempFolder.newFolder().getAbsolutePath();
+		String backups = tempFolder.newFolder().getAbsolutePath();
+		// we use the fs backend with small threshold here to test the behaviour with file
+		// references, not self contained byte handles
+		RocksDBStateBackend rdb =
+			new RocksDBStateBackend(
+				new FsStateBackend(
+					new Path("file://" + backups).toUri(), fileSizeThreshold),
+				incrementalCheckpoints);
+		rdb.setDbStoragePath(rocksDb);
+		this.stateBackend = rdb;
+	}
+
 	protected Configuration createClusterConfig() throws IOException {
 		TemporaryFolder temporaryFolder = new TemporaryFolder();
 		temporaryFolder.create();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services