You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/03 17:32:52 UTC

[3/4] flink git commit: [FLINK-9491][state] Generalization of timer queue (decoupled from timers) and second implementation based on RocksDB

[FLINK-9491][state] Generalization of timer queue (decoupled from timers) and second implementation based on RocksDB

This closes #6228.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6ad421e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6ad421e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6ad421e

Branch: refs/heads/master
Commit: c6ad421e2b83fe8dc6031ce9716285fdf941d5e0
Parents: 8cb89f9
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Jun 18 14:38:01 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jul 3 19:32:08 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/util/CloseableIterator.java    |  86 ++++
 .../java/org/apache/flink/util/MathUtils.java   |  10 +
 .../org/apache/flink/util/MathUtilTest.java     |  11 +
 flink-runtime/pom.xml                           |   7 +-
 .../runtime/state/InternalPriorityQueue.java    |  99 ++++
 .../runtime/state/KeyExtractorFunction.java     |  36 ++
 .../runtime/state/KeyGroupPartitioner.java      |  13 -
 .../heap/CachingInternalPriorityQueueSet.java   | 288 ++++++++++++
 .../runtime/state/heap/HeapPriorityQueue.java   | 343 ++++++++++++++
 .../state/heap/HeapPriorityQueueElement.java    |  49 ++
 .../state/heap/HeapPriorityQueueSet.java        | 185 ++++++++
 .../heap/KeyGroupPartitionedPriorityQueue.java  | 280 +++++++++++
 .../runtime/state/heap/TreeOrderedSetCache.java | 128 +++++
 .../state/InternalPriorityQueueTestBase.java    | 466 +++++++++++++++++++
 .../state/KeyGroupPartitionerTestBase.java      |   8 +-
 ...CachingInternalPriorityQueueSetTestBase.java |  43 ++
 .../state/heap/HeapPriorityQueueSetTest.java    |  39 ++
 .../state/heap/HeapPriorityQueueTest.java       |  99 ++++
 .../KeyGroupPartitionedPriorityQueueTest.java   |  53 +++
 .../state/heap/OrderedSetCacheTestBase.java     | 102 ++++
 ...mpleCachingInternalPriorityQueueSetTest.java |  35 ++
 .../runtime/state/heap/TestOrderedStore.java    |  60 +++
 .../state/heap/TreeOrderedSetCacheTest.java     |  30 ++
 .../state/RocksDBKeySerializationUtils.java     |   4 +
 .../state/RocksDBKeyedStateBackend.java         |   3 +-
 .../streaming/state/RocksDBOrderedSetStore.java | 278 +++++++++++
 ...nalPriorityQueueSetWithRocksDBStoreTest.java |  66 +++
 ...tionedPriorityQueueWithRocksDBStoreTest.java |  53 +++
 .../state/RocksDBOrderedSetStoreTest.java       | 137 ++++++
 .../streaming/state/RocksDBResource.java        | 159 +++++++
 .../api/operators/HeapInternalTimerService.java |  82 ++--
 .../HeapPriorityQueueStateSnapshot.java         | 112 +++++
 .../api/operators/InternalTimerHeap.java        | 413 ----------------
 .../operators/InternalTimerHeapSnapshot.java    |  97 ----
 .../api/operators/InternalTimersSnapshot.java   |  16 +-
 .../InternalTimersSnapshotReaderWriters.java    |  16 +-
 .../api/operators/TimerHeapInternalTimer.java   |  70 ++-
 .../operators/HeapInternalTimerServiceTest.java |  14 +-
 .../api/operators/InternalTimerHeapTest.java    | 301 ------------
 39 files changed, 3383 insertions(+), 908 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
new file mode 100644
index 0000000..09ea046
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.Consumer;
+
+/**
+ * This interface represents an {@link Iterator} that is also {@link AutoCloseable}. A typical use-case for this
+ * interface are iterators that are based on native-resources such as files, network, or database connections. Clients
+ * must call {@link #close()} after using the iterator.
+ *
+ * @param <T> the type of iterated elements.
+ */
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
+
+	CloseableIterator<?> EMPTY_INSTANCE = CloseableIterator.adapterForIterator(Collections.emptyIterator());
+
+	@Nonnull
+	static <T> CloseableIterator<T> adapterForIterator(@Nonnull Iterator<T> iterator) {
+		return new IteratorAdapter<>(iterator);
+	}
+
+	@SuppressWarnings("unchecked")
+	static <T> CloseableIterator<T> empty() {
+		return (CloseableIterator<T>) EMPTY_INSTANCE;
+	}
+
+	/**
+	 * Adapter from {@link Iterator} to {@link CloseableIterator}. Does nothing on {@link #close()}.
+	 *
+	 * @param <E> the type of iterated elements.
+	 */
+	final class IteratorAdapter<E> implements CloseableIterator<E> {
+
+		@Nonnull
+		private final Iterator<E> delegate;
+
+		IteratorAdapter(@Nonnull Iterator<E> delegate) {
+			this.delegate = delegate;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return delegate.hasNext();
+		}
+
+		@Override
+		public E next() {
+			return delegate.next();
+		}
+
+		@Override
+		public void remove() {
+			delegate.remove();
+		}
+
+		@Override
+		public void forEachRemaining(Consumer<? super E> action) {
+			delegate.forEachRemaining(action);
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index 48c32a3..96fc3c4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -197,6 +197,16 @@ public final class MathUtils {
 		return in;
 	}
 
+	/**
+	 * Flips the sign bit (most-significant-bit) of the input.
+	 *
+	 * @param in the input value.
+	 * @return the input with a flipped sign bit (most-significant-bit).
+	 */
+	public static long flipSignBit(long in) {
+		return in ^ Long.MIN_VALUE;
+	}
+
 	// ============================================================================================
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
index 82c17e9..17a915c 100644
--- a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.util;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -131,4 +132,14 @@ public class MathUtilTest {
 		assertFalse(MathUtils.isPowerOf2(Integer.MAX_VALUE));
 		assertFalse(MathUtils.isPowerOf2(Long.MAX_VALUE));
 	}
+
+	@Test
+	public void testFlipSignBit() {
+		Assert.assertEquals(0L, MathUtils.flipSignBit(Long.MIN_VALUE));
+		Assert.assertEquals(Long.MIN_VALUE, MathUtils.flipSignBit(0L));
+		Assert.assertEquals(-1L, MathUtils.flipSignBit(Long.MAX_VALUE));
+		Assert.assertEquals(Long.MAX_VALUE, MathUtils.flipSignBit(-1L));
+		Assert.assertEquals(42L | Long.MIN_VALUE, MathUtils.flipSignBit(42L));
+		Assert.assertEquals(-42L & Long.MAX_VALUE, MathUtils.flipSignBit(-42L));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 9a4034f..cfc57c0 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -116,6 +116,12 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>it.unimi.dsi</groupId>
+			<artifactId>fastutil</artifactId>
+			<version>8.2.1</version>
+		</dependency>
+
+		<dependency>
 			<groupId>org.scala-lang</groupId>
 			<artifactId>scala-library</artifactId>
 		</dependency>
@@ -306,7 +312,6 @@ under the License.
 			<groupId>org.reflections</groupId>
 			<artifactId>reflections</artifactId>
 		</dependency>
-
 	</dependencies>
 
 	<!-- Dependency Management to converge transitive dependency versions -->

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
new file mode 100644
index 0000000..fb3ee82
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * Interface for collection that gives in order access to elements w.r.t their priority.
+ *
+ * @param <T> type of elements in the ordered set.
+ */
+@Internal
+public interface InternalPriorityQueue<T> {
+
+	/**
+	 * Retrieves and removes the first element (w.r.t. the order) of this set,
+	 * or returns {@code null} if this set is empty.
+	 *
+	 * @return the first element of this ordered set, or {@code null} if this set is empty.
+	 */
+	@Nullable
+	T poll();
+
+	/**
+	 * Retrieves, but does not remove, the element (w.r.t. order) of this set,
+	 * or returns {@code null} if this set is empty.
+	 *
+	 * @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty.
+	 */
+	@Nullable
+	T peek();
+
+	/**
+	 * Adds the given element to the set, if it is not already contained.
+	 *
+	 * @param toAdd the element to add to the set.
+	 * @return <code>true</code> if the operation changed the head element or if it is unclear if the head element changed.
+	 * Only returns <code>false</code> if the head element was not changed by this operation.
+	 */
+	boolean add(@Nonnull T toAdd);
+
+	/**
+	 * Removes the given element from the set, if is contained in the set.
+	 *
+	 * @param toRemove the element to remove.
+	 * @return <code>true</code> if the operation changed the head element or if it is unclear if the head element changed.
+	 * Only returns <code>false</code> if the head element was not changed by this operation.
+	 */
+	boolean remove(@Nonnull T toRemove);
+
+	/**
+	 * Check if the set contains any elements.
+	 *
+	 * @return true if the set is empty, i.e. no element is contained.
+	 */
+	boolean isEmpty();
+
+	/**
+	 * Returns the number of elements in this set.
+	 *
+	 * @return the number of elements in this set.
+	 */
+	@Nonnegative
+	int size();
+
+	/**
+	 * Adds all the given elements to the set.
+	 */
+	void addAll(@Nullable Collection<? extends T> toAdd);
+
+	/**
+	 * Iterator over all elements, no order guaranteed. Iterator must be closed after usage.
+	 */
+	@Nonnull
+	CloseableIterator<T> iterator();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
new file mode 100644
index 0000000..a3ce11c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Function to extract a key from a given object.
+ *
+ * @param <T> type of the element from which we extract the key.
+ */
+@FunctionalInterface
+public interface KeyExtractorFunction<T> {
+
+	/**
+	 * Returns the key for the given element by which the key-group can be computed.
+	 */
+	@Nonnull
+	Object extractKeyFromElement(@Nonnull T element);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
index 673a0ef..6a9dfb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
@@ -265,19 +265,6 @@ public class KeyGroupPartitioner<T> {
 	}
 
 	/**
-	 * @param <T> type of the element from which we extract the key.
-	 */
-	@FunctionalInterface
-	public interface KeyExtractorFunction<T> {
-
-		/**
-		 * Returns the key for the given element by which the key-group can be computed.
-		 */
-		@Nonnull
-		Object extractKeyFromElement(@Nonnull T element);
-	}
-
-	/**
 	 * This functional interface defines how one element is written to a {@link DataOutputView}.
 	 *
 	 * @param <T> type of the written elements.

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
new file mode 100644
index 0000000..771315d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
+ * two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth
+ * about the elements in this queue. The second storage is a (fast) ordered set cache, typically with some limited
+ * capacity. The cache is used to improve performance of accesses to the underlying store and contains an ordered
+ * (partial) view on the top elements in the ordered store. We are currently applying simple write-through to keep cache
+ * and store in sync on updates and refill the cache from the store when it is empty and we expect more elements
+ * contained in the store.
+ *
+ * @param <E> the type if the managed elements.
+ */
+public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue<E>, HeapPriorityQueueElement {
+
+	/** A ordered set cache that contains a (partial) view on the top elements in the ordered store. */
+	@Nonnull
+	private final OrderedSetCache<E> orderedCache;
+
+	/** A store with ordered set semantics that contains the ground truth of all inserted elements. */
+	@Nonnull
+	private final OrderedSetStore<E> orderedStore;
+
+	/** This flag is true if there could be elements in the backend that are not in the cache (false positives ok). */
+	private boolean storeOnlyElements;
+
+	/** Management data for the {@link HeapPriorityQueueElement} trait. */
+	private int pqManagedIndex;
+
+	@SuppressWarnings("unchecked")
+	public CachingInternalPriorityQueueSet(
+		@Nonnull OrderedSetCache<E> orderedCache,
+		@Nonnull OrderedSetStore<E> orderedStore) {
+		this.orderedCache = orderedCache;
+		this.orderedStore = orderedStore;
+		// We are careful and set this to true. Could be set to false if we know for sure that the store is empty, but
+		// checking this could be an expensive operation.
+		this.storeOnlyElements = true;
+		this.pqManagedIndex = HeapPriorityQueueElement.NOT_CONTAINED;
+	}
+
+	@Nullable
+	@Override
+	public E peek() {
+
+		checkRefillCacheFromStore();
+
+		return orderedCache.peekFirst();
+	}
+
+	@Nullable
+	@Override
+	public E poll() {
+
+		checkRefillCacheFromStore();
+
+		final E first = orderedCache.removeFirst();
+
+		if (first != null) {
+			// write-through sync
+			orderedStore.remove(first);
+		}
+
+		return first;
+	}
+
+	@Override
+	public boolean add(@Nonnull E toAdd) {
+
+		checkRefillCacheFromStore();
+
+		// write-through sync
+		orderedStore.add(toAdd);
+
+		final boolean cacheFull = orderedCache.isFull();
+
+		if ((!cacheFull && !storeOnlyElements) || orderedCache.isInLowerBound(toAdd)) {
+
+			if (cacheFull) {
+				// we drop the element with lowest priority from the cache
+				orderedCache.removeLast();
+				// the dropped element is now only in the store
+				storeOnlyElements = true;
+			}
+
+			orderedCache.add(toAdd);
+			return toAdd.equals(orderedCache.peekFirst());
+		} else {
+			// we only added to the store
+			storeOnlyElements = true;
+			return false;
+		}
+	}
+
+	@Override
+	public boolean remove(@Nonnull E toRemove) {
+
+		checkRefillCacheFromStore();
+
+		boolean newHead = toRemove.equals(orderedCache.peekFirst());
+		// write-through sync
+		orderedStore.remove(toRemove);
+		orderedCache.remove(toRemove);
+		return newHead;
+	}
+
+	@Override
+	public void addAll(@Nullable Collection<? extends E> toAdd) {
+
+		if (toAdd == null) {
+			return;
+		}
+
+		for (E element : toAdd) {
+			add(element);
+		}
+	}
+
+	@Override
+	public int size() {
+		return orderedStore.size();
+	}
+
+	@Override
+	public boolean isEmpty() {
+		checkRefillCacheFromStore();
+		return orderedCache.isEmpty();
+	}
+
+	@Nonnull
+	@Override
+	public CloseableIterator<E> iterator() {
+		return orderedStore.orderedIterator();
+	}
+
+	@Override
+	public int getInternalIndex() {
+		return pqManagedIndex;
+	}
+
+	@Override
+	public void setInternalIndex(int updateIndex) {
+		this.pqManagedIndex = updateIndex;
+	}
+
+	/**
+	 * Refills the cache from the store when the cache is empty and we expect more elements in the store.
+	 *
+	 * TODO: We can think about exploiting the property that the store is already sorted when bulk-filling the cache.
+	 */
+	private void checkRefillCacheFromStore() {
+		if (storeOnlyElements && orderedCache.isEmpty()) {
+			try (final CloseableIterator<E> iterator = orderedStore.orderedIterator()) {
+				while (iterator.hasNext() && !orderedCache.isFull()) {
+					orderedCache.add(iterator.next());
+				}
+				storeOnlyElements = iterator.hasNext();
+			} catch (Exception e) {
+				throw new FlinkRuntimeException("Exception while closing RocksDB iterator.", e);
+			}
+		}
+	}
+
+	/**
+	 * Interface for an ordered cache with set semantics of elements to be used in
+	 * {@link CachingInternalPriorityQueueSet}. Cache implementations typically have a limited capacity as indicated
+	 * via the {@link #isFull()} method.
+	 *
+	 * @param <E> the type of contained elements.
+	 */
+	public interface OrderedSetCache<E> {
+
+		/**
+		 * Adds the given element to the cache (if not yet contained). This method should only be called if the cache
+		 * is not full.
+		 * @param element element to add to the cache.
+		 */
+		void add(@Nonnull E element);
+
+		/**
+		 * Removes the given element from the cache (if contained).
+		 * @param element element to remove from the cache.
+		 */
+		void remove(@Nonnull E element);
+
+		/**
+		 * Returns <code>true</code> if the cache is full and no more elements can be added.
+		 */
+		boolean isFull();
+
+		/**
+		 * Returns <code>true</code> if the cache is empty, i.e. contains ne elements.
+		 */
+		boolean isEmpty();
+
+		/**
+		 * Returns true, if the element is compares smaller than the currently largest element in the cache.
+		 */
+		boolean isInLowerBound(@Nonnull E toCheck);
+
+		/**
+		 * Removes and returns the first (smallest) element from the cache.
+		 */
+		@Nullable
+		E removeFirst();
+
+		/**
+		 * Removes and returns the last (larges) element from the cache.
+		 */
+		@Nullable
+		E removeLast();
+
+		/**
+		 * Returns the first (smallest) element from the cache (without removing it).
+		 */
+		@Nullable
+		E peekFirst();
+
+		/**
+		 * Returns the last (larges) element from the cache (without removing it).
+		 */
+		@Nullable
+		E peekLast();
+	}
+
+	/**
+	 * Interface for an ordered store with set semantics of elements to be used in
+	 * {@link CachingInternalPriorityQueueSet}. Stores are assumed to have (practically) unlimited capacity, but their
+	 * operations could all be expensive.
+	 *
+	 * @param <E> the type of contained elements.
+	 */
+	public interface OrderedSetStore<E> {
+
+		/**
+		 * Adds the given element to the store (if not yet contained).
+		 * @param element element to add to the store.
+		 */
+		void add(@Nonnull E element);
+
+		/**
+		 * Removed the given element from the cache (if contained).
+		 * @param element element to remove from the cache.
+		 */
+		void remove(@Nonnull E element);
+
+		/**
+		 * Returns the number of elements in the store.
+		 */
+		@Nonnegative
+		int size();
+
+		/**
+		 * Returns an iterator over the store that returns element in order. The iterator must be closed by the client
+		 * after usage.
+		 */
+		@Nonnull
+		CloseableIterator<E> orderedIterator();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
new file mode 100644
index 0000000..7017905
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
+ * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
+ * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
+ * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
+ * not on equals. We use the managed index from {@link HeapPriorityQueueElement} to find an element in the queue
+ * array to support fast deletes.
+ *
+ * <p>Possible future improvements:
+ * <ul>
+ *  <li>We could also implement shrinking for the heap.</li>
+ * </ul>
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
+
+	/**
+	 * The index of the head element in the array that represents the heap.
+	 */
+	private static final int QUEUE_HEAD_INDEX = 1;
+
+	/**
+	 * Comparator for the contained elements.
+	 */
+	private final Comparator<T> elementComparator;
+
+	/**
+	 * The array that represents the heap-organized priority queue.
+	 */
+	private T[] queue;
+
+	/**
+	 * The current size of the priority queue.
+	 */
+	private int size;
+
+	/**
+	 * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
+	 *
+	 * @param elementComparator comparator for the contained elements.
+	 * @param minimumCapacity the minimum and initial capacity of this priority queue.
+	 */
+	@SuppressWarnings("unchecked")
+	public HeapPriorityQueue(
+		@Nonnull Comparator<T> elementComparator,
+		@Nonnegative int minimumCapacity) {
+
+		this.elementComparator = elementComparator;
+		this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
+	}
+
+	@Override
+	@Nullable
+	public T poll() {
+		return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
+	}
+
+	@Override
+	@Nullable
+	public T peek() {
+		return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
+	}
+
+	/**
+	 * Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}.
+	 *
+	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+	 * Only returns <code>false</code> iff the head element was not changed by this operation.
+	 */
+	@Override
+	public boolean add(@Nonnull T toAdd) {
+		return addInternal(toAdd);
+	}
+
+	/**
+	 * This remove is based on object identity, not the result of equals. We use the objects managed index to find
+	 * the instance in the queue array.
+	 *
+	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+	 * Only returns <code>false</code> iff the head element was not changed by this operation.
+	 */
+	@Override
+	public boolean remove(@Nonnull T toRemove) {
+		return removeInternal(toRemove);
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return size() == 0;
+	}
+
+	@Override
+	@Nonnegative
+	public int size() {
+		return size;
+	}
+
+	public void clear() {
+		size = 0;
+		Arrays.fill(queue, null);
+	}
+
+	@SuppressWarnings({"unchecked"})
+	@Nonnull
+	public <O> O[] toArray(O[] out) {
+		if (out.length < size) {
+			return (O[]) Arrays.copyOfRange(queue, QUEUE_HEAD_INDEX, QUEUE_HEAD_INDEX + size, out.getClass());
+		} else {
+			System.arraycopy(queue, QUEUE_HEAD_INDEX, out, 0, size);
+			if (out.length > size) {
+				out[size] = null;
+			}
+			return out;
+		}
+	}
+
+	/**
+	 * Returns an iterator over the elements in this queue. The iterator
+	 * does not return the elements in any particular order.
+	 *
+	 * @return an iterator over the elements in this queue.
+	 */
+	@Nonnull
+	public CloseableIterator<T> iterator() {
+		return new HeapIterator();
+	}
+
+	@Override
+	public void addAll(@Nullable Collection<? extends T> restoredElements) {
+
+		if (restoredElements == null) {
+			return;
+		}
+
+		resizeForBulkLoad(restoredElements.size());
+
+		for (T element : restoredElements) {
+			add(element);
+		}
+	}
+
+	private boolean addInternal(@Nonnull T element) {
+		final int newSize = increaseSizeByOne();
+		moveElementToIdx(element, newSize);
+		siftUp(newSize);
+		return element.getInternalIndex() == QUEUE_HEAD_INDEX;
+	}
+
+	private boolean removeInternal(@Nonnull T elementToRemove) {
+		final int elementIndex = elementToRemove.getInternalIndex();
+		removeElementAtIndex(elementIndex);
+		return elementIndex == QUEUE_HEAD_INDEX;
+	}
+
+	private T removeElementAtIndex(int removeIdx) {
+		T[] heap = this.queue;
+		T removedValue = heap[removeIdx];
+
+		assert removedValue.getInternalIndex() == removeIdx;
+
+		final int oldSize = size;
+
+		if (removeIdx != oldSize) {
+			T element = heap[oldSize];
+			moveElementToIdx(element, removeIdx);
+			adjustElementAtIndex(element, removeIdx);
+		}
+
+		heap[oldSize] = null;
+
+		--size;
+		return removedValue;
+	}
+
+	public void adjustModifiedElement(@Nonnull T element) {
+		final int elementIndex = element.getInternalIndex();
+		if (element == queue[elementIndex]) {
+			adjustElementAtIndex(element, elementIndex);
+		}
+	}
+
+	private void adjustElementAtIndex(T element, int index) {
+		siftDown(index);
+		if (queue[index] == element) {
+			siftUp(index);
+		}
+	}
+
+	private void siftUp(int idx) {
+		final T[] heap = this.queue;
+		final T currentElement = heap[idx];
+		int parentIdx = idx >>> 1;
+
+		while (parentIdx > 0 && isElementLessThen(currentElement, heap[parentIdx])) {
+			moveElementToIdx(heap[parentIdx], idx);
+			idx = parentIdx;
+			parentIdx >>>= 1;
+		}
+
+		moveElementToIdx(currentElement, idx);
+	}
+
+	private void siftDown(int idx) {
+		final T[] heap = this.queue;
+		final int heapSize = this.size;
+
+		final T currentElement = heap[idx];
+		int firstChildIdx = idx << 1;
+		int secondChildIdx = firstChildIdx + 1;
+
+		if (isElementIndexValid(secondChildIdx, heapSize) &&
+			isElementLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
+			firstChildIdx = secondChildIdx;
+		}
+
+		while (isElementIndexValid(firstChildIdx, heapSize) &&
+			isElementLessThen(heap[firstChildIdx], currentElement)) {
+			moveElementToIdx(heap[firstChildIdx], idx);
+			idx = firstChildIdx;
+			firstChildIdx = idx << 1;
+			secondChildIdx = firstChildIdx + 1;
+
+			if (isElementIndexValid(secondChildIdx, heapSize) &&
+				isElementLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
+				firstChildIdx = secondChildIdx;
+			}
+		}
+
+		moveElementToIdx(currentElement, idx);
+	}
+
+	private boolean isElementIndexValid(int elementIndex, int heapSize) {
+		return elementIndex <= heapSize;
+	}
+
+	private boolean isElementLessThen(T a, T b) {
+		return elementComparator.compare(a, b) < 0;
+	}
+
+	private void moveElementToIdx(T element, int idx) {
+		queue[idx] = element;
+		element.setInternalIndex(idx);
+	}
+
+	private int increaseSizeByOne() {
+		final int oldArraySize = queue.length;
+		final int minRequiredNewSize = ++size;
+		if (minRequiredNewSize >= oldArraySize) {
+			final int grow = (oldArraySize < 64) ? oldArraySize + 2 : oldArraySize >> 1;
+			resizeQueueArray(oldArraySize + grow, minRequiredNewSize);
+		}
+		// TODO implement shrinking as well?
+		return minRequiredNewSize;
+	}
+
+	private void resizeForBulkLoad(int totalSize) {
+		if (totalSize > queue.length) {
+			int desiredSize = totalSize + (totalSize >>> 3);
+			resizeQueueArray(desiredSize, totalSize);
+		}
+	}
+
+	private void resizeQueueArray(int desiredSize, int minRequiredSize) {
+		if (isValidArraySize(desiredSize)) {
+			queue = Arrays.copyOf(queue, desiredSize);
+		} else if (isValidArraySize(minRequiredSize)) {
+			queue = Arrays.copyOf(queue, MAX_ARRAY_SIZE);
+		} else {
+			throw new OutOfMemoryError("Required minimum heap size " + minRequiredSize +
+				" exceeds maximum size of " + MAX_ARRAY_SIZE + ".");
+		}
+	}
+
+	private static boolean isValidArraySize(int size) {
+		return size >= 0 && size <= MAX_ARRAY_SIZE;
+	}
+
+	/**
+	 * {@link Iterator} implementation for {@link HeapPriorityQueue}.
+	 * {@link Iterator#remove()} is not supported.
+	 */
+	private class HeapIterator implements CloseableIterator<T> {
+
+		private int iterationIdx;
+
+		HeapIterator() {
+			this.iterationIdx = QUEUE_HEAD_INDEX - 1;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return iterationIdx < size;
+		}
+
+		@Override
+		public T next() {
+			if (iterationIdx >= size) {
+				throw new NoSuchElementException("Iterator has no next element.");
+			}
+			return queue[++iterationIdx];
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java
new file mode 100644
index 0000000..a84758b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for objects that can be managed by a {@link HeapPriorityQueue}. Such an object can only be contained in at
+ * most one {@link HeapPriorityQueue} at a time.
+ */
+@Internal
+public interface HeapPriorityQueueElement {
+
+	/**
+	 * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any
+	 * {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when
+	 * elements are removed from a {@link HeapPriorityQueue}.
+	 */
+	int NOT_CONTAINED = Integer.MIN_VALUE;
+
+	/**
+	 * Returns the current index of this object in the internal array of {@link HeapPriorityQueue}.
+	 */
+	int getInternalIndex();
+
+	/**
+	 * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning
+	 * {@link HeapPriorityQueue}.
+	 *
+	 * @param newIndex the new index in the timer heap.
+	 */
+	void setInternalIndex(int newIndex);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
new file mode 100644
index 0000000..61313e9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue with set semantics, based on {@link HeapPriorityQueue}. The heap is supported by hash
+ * set for fast contains (de-duplication) and deletes. Object identification happens based on {@link #equals(Object)}.
+ *
+ * <p>Possible future improvements:
+ * <ul>
+ *  <li>We could also implement shrinking for the heap and the deduplication set.</li>
+ *  <li>We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful adding, etc..</li>
+ * </ul>
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends HeapPriorityQueue<T> {
+
+	/**
+	 * Function to extract the key from contained elements.
+	 */
+	private final KeyExtractorFunction<T> keyExtractor;
+
+	/**
+	 * This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of elements.
+	 */
+	private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
+
+	/**
+	 * The key-group range of elements that are managed by this queue.
+	 */
+	private final KeyGroupRange keyGroupRange;
+
+	/**
+	 * The total number of key-groups of the job.
+	 */
+	private final int totalNumberOfKeyGroups;
+
+	/**
+	 * Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
+	 *
+	 * @param elementComparator comparator for the contained elements.
+	 * @param keyExtractor function to extract a key from the contained elements.
+	 * @param minimumCapacity the minimum and initial capacity of this priority queue.
+	 * @param keyGroupRange the key-group range of the elements in this set.
+	 * @param totalNumberOfKeyGroups the total number of key-groups of the job.
+	 */
+	@SuppressWarnings("unchecked")
+	public HeapPriorityQueueSet(
+		@Nonnull Comparator<T> elementComparator,
+		@Nonnull KeyExtractorFunction<T> keyExtractor,
+		@Nonnegative int minimumCapacity,
+		@Nonnull KeyGroupRange keyGroupRange,
+		@Nonnegative int totalNumberOfKeyGroups) {
+
+		super(elementComparator, minimumCapacity);
+
+		this.keyExtractor = keyExtractor;
+
+		this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+		this.keyGroupRange = keyGroupRange;
+
+		final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups();
+		final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange;
+		this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange];
+		for (int i = 0; i < keyGroupsInLocalRange; ++i) {
+			deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize);
+		}
+	}
+
+	@Override
+	@Nullable
+	public T poll() {
+		final T toRemove = super.poll();
+		if (toRemove != null) {
+			return getDedupMapForElement(toRemove).remove(toRemove);
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Adds the element to the queue. In contrast to the superclass and to maintain set semantics, this happens only if
+	 * no such element is already contained (determined by {@link #equals(Object)}).
+	 *
+	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+	 * Only returns <code>false</code> iff the head element was not changed by this operation.
+	 */
+	@Override
+	public boolean add(@Nonnull T element) {
+		return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
+	}
+
+	/**
+	 * In contrast to the superclass and to maintain set semantics, removal here is based on comparing the given element
+	 * via {@link #equals(Object)}.
+	 *
+	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+	 * Only returns <code>false</code> iff the head element was not changed by this operation.
+	 */
+	@Override
+	public boolean remove(@Nonnull T toRemove) {
+		T storedElement = getDedupMapForElement(toRemove).remove(toRemove);
+		return storedElement != null && super.remove(storedElement);
+	}
+
+	@Override
+	public void clear() {
+		super.clear();
+		for (HashMap<?, ?> elementHashMap : deduplicationMapsByKeyGroup) {
+			elementHashMap.clear();
+		}
+	}
+
+	/**
+	 * Returns an unmodifiable set of all elements in the given key-group.
+	 */
+	@Nonnull
+	public Set<T> getElementsForKeyGroup(@Nonnegative int keyGroupIdx) {
+		return Collections.unmodifiableSet(getDedupMapForKeyGroup(keyGroupIdx).keySet());
+	}
+
+	@VisibleForTesting
+	@SuppressWarnings("unchecked")
+	@Nonnull
+	public List<Set<T>> getElementsByKeyGroup() {
+		List<Set<T>> result = new ArrayList<>(deduplicationMapsByKeyGroup.length);
+		for (int i = 0; i < deduplicationMapsByKeyGroup.length; ++i) {
+			result.add(i, Collections.unmodifiableSet(deduplicationMapsByKeyGroup[i].keySet()));
+		}
+		return result;
+	}
+
+	private HashMap<T, T> getDedupMapForKeyGroup(
+		@Nonnegative int keyGroupIdx) {
+		return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupIdx)];
+	}
+
+	private HashMap<T, T> getDedupMapForElement(T element) {
+		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(
+			keyExtractor.extractKeyFromElement(element),
+			totalNumberOfKeyGroups);
+		return getDedupMapForKeyGroup(keyGroup);
+	}
+
+	private int globalKeyGroupToLocalIndex(int keyGroup) {
+		checkArgument(keyGroupRange.contains(keyGroup));
+		return keyGroup - keyGroupRange.getStartKeyGroup();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
new file mode 100644
index 0000000..af4d54f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+/**
+ * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
+ * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
+ * semantics.
+ *
+ * @param <T> the type of elements in the queue.
+ * @param <PQ> type type of sub-queue used for each key-group partition.
+ */
+public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+	implements InternalPriorityQueue<T> {
+
+	/** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
+	@Nonnull
+	private final HeapPriorityQueue<PQ> heapOfkeyGroupedHeaps;
+
+	/** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
+	@Nonnull
+	private final PQ[] keyGroupedHeaps;
+
+	/** Function to extract the key from contained elements. */
+	@Nonnull
+	private final KeyExtractorFunction<T> keyExtractor;
+
+	/** The total number of key-groups (in the job). */
+	@Nonnegative
+	private final int totalKeyGroups;
+
+	/** The smallest key-group id with a subpartition managed by this ordered set. */
+	@Nonnegative
+	private final int firstKeyGroup;
+
+	@SuppressWarnings("unchecked")
+	public KeyGroupPartitionedPriorityQueue(
+		@Nonnull KeyExtractorFunction<T> keyExtractor,
+		@Nonnull Comparator<T> elementComparator,
+		@Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
+		@Nonnull KeyGroupRange keyGroupRange,
+		@Nonnegative int totalKeyGroups) {
+
+		this.keyExtractor = keyExtractor;
+		this.totalKeyGroups = totalKeyGroups;
+		this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
+		this.keyGroupedHeaps = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
+		this.heapOfkeyGroupedHeaps = new HeapPriorityQueue<>(
+			new InternalPriorityQueueComparator<>(elementComparator),
+			keyGroupRange.getNumberOfKeyGroups());
+		for (int i = 0; i < keyGroupedHeaps.length; i++) {
+			final PQ keyGroupSubHeap =
+				orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementComparator);
+			keyGroupedHeaps[i] = keyGroupSubHeap;
+			heapOfkeyGroupedHeaps.add(keyGroupSubHeap);
+		}
+	}
+
+	@Nullable
+	@Override
+	public T poll() {
+		final PQ headList = heapOfkeyGroupedHeaps.peek();
+		final T head = headList.poll();
+		heapOfkeyGroupedHeaps.adjustModifiedElement(headList);
+		return head;
+	}
+
+	@Nullable
+	@Override
+	public T peek() {
+		return heapOfkeyGroupedHeaps.peek().peek();
+	}
+
+	@Override
+	public boolean add(@Nonnull T toAdd) {
+		final PQ list = getKeyGroupSubHeapForElement(toAdd);
+
+		// the branch checks if the head element has (potentially) changed.
+		if (list.add(toAdd)) {
+			heapOfkeyGroupedHeaps.adjustModifiedElement(list);
+			// could we have a new head?
+			return toAdd.equals(peek());
+		} else {
+			// head unchanged
+			return false;
+		}
+	}
+
+	@Override
+	public boolean remove(@Nonnull T toRemove) {
+		final PQ list = getKeyGroupSubHeapForElement(toRemove);
+
+		final T oldHead = peek();
+
+		// the branch checks if the head element has (potentially) changed.
+		if (list.remove(toRemove)) {
+			heapOfkeyGroupedHeaps.adjustModifiedElement(list);
+			// could we have a new head?
+			return toRemove.equals(oldHead);
+		} else {
+			// head unchanged
+			return false;
+		}
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return peek() == null;
+	}
+
+	@Override
+	public int size() {
+		int sizeSum = 0;
+		for (PQ list : keyGroupedHeaps) {
+			sizeSum += list.size();
+		}
+		return sizeSum;
+	}
+
+	@Override
+	public void addAll(@Nullable Collection<? extends T> toAdd) {
+
+		if (toAdd == null) {
+			return;
+		}
+
+		// TODO consider bulk loading the partitions and "heapify" keyGroupHeap once after all elements are inserted.
+		for (T element : toAdd) {
+			add(element);
+		}
+	}
+
+	@Nonnull
+	@Override
+	public CloseableIterator<T> iterator() {
+		return new KeyGroupConcatenationIterator<>(keyGroupedHeaps);
+	}
+
+	private PQ getKeyGroupSubHeapForElement(T element) {
+		return keyGroupedHeaps[computeKeyGroupIndex(element)];
+	}
+
+	private int computeKeyGroupIndex(T element) {
+		final Object extractKeyFromElement = keyExtractor.extractKeyFromElement(element);
+		final int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement, totalKeyGroups);
+		return keyGroupId - firstKeyGroup;
+	}
+
+	/**
+	 * Iterator for {@link KeyGroupPartitionedPriorityQueue}. This iterator is not guaranteeing any order of elements.
+	 * Using code must {@link #close()} after usage.
+	 *
+	 * @param <T> the type of iterated elements.
+	 */
+	private static final class KeyGroupConcatenationIterator<
+		T, PQS extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+		implements CloseableIterator<T> {
+
+		/** Array with the subpartitions that we iterate. No null values in the array. */
+		@Nonnull
+		private final PQS[] keyGroupLists;
+
+		/** The subpartition the is currently iterated. */
+		@Nonnegative
+		private int index;
+
+		/** The iterator of the current subpartition. */
+		@Nonnull
+		private CloseableIterator<T> current;
+
+		private KeyGroupConcatenationIterator(@Nonnull PQS[] keyGroupLists) {
+			this.keyGroupLists = keyGroupLists;
+			this.index = 0;
+			this.current = CloseableIterator.empty();
+		}
+
+		@Override
+		public boolean hasNext() {
+			boolean currentHasNext = current.hasNext();
+
+			// find the iterator of the next partition that has elements.
+			while (!currentHasNext && index < keyGroupLists.length) {
+				IOUtils.closeQuietly(current);
+				current = keyGroupLists[index++].iterator();
+				currentHasNext = current.hasNext();
+			}
+			return currentHasNext;
+		}
+
+		@Override
+		public T next() {
+			return current.next();
+		}
+
+		@Override
+		public void close() throws Exception {
+			current.close();
+		}
+	}
+
+	/**
+	 * Comparator that compares {@link InternalPriorityQueue} objects by their head element. Must handle null results
+	 * from {@link #peek()}.
+	 *
+	 * @param <T> type of the elements in the compared queues.
+	 * @param <Q> type of queue.
+	 */
+	private static final class InternalPriorityQueueComparator<T, Q extends InternalPriorityQueue<T>>
+		implements Comparator<Q> {
+
+		/** Comparator for the queue elements, so we can compare their heads. */
+		@Nonnull
+		private final Comparator<T> elementComparator;
+
+		InternalPriorityQueueComparator(@Nonnull Comparator<T> elementComparator) {
+			this.elementComparator = elementComparator;
+		}
+
+		@Override
+		public int compare(Q o1, Q o2) {
+			final T left = o1.peek();
+			final T right = o2.peek();
+			if (left == null) {
+				return (right == null ? 0 : 1);
+			} else {
+				return (right == null ? -1 : elementComparator.compare(left, right));
+			}
+		}
+	}
+
+	/**
+	 * Factory that produces the sub-queues that represent the partitions of a {@link KeyGroupPartitionedPriorityQueue}.
+	 *
+	 * @param <T> type of the elements in the queue set.
+	 * @param <PQS> type of the priority queue. Must have set semantics and {@link HeapPriorityQueueElement}.
+	 */
+	public interface PartitionQueueSetFactory<T, PQS extends InternalPriorityQueue<T> & HeapPriorityQueueElement> {
+
+		/**
+		 * Creates a new queue for a given key-group partition.
+		 *
+		 * @param keyGroupId the key-group of the elements managed by the produced queue.
+		 * @param numKeyGroups the total number of key-groups in the job.
+		 * @param elementComparator the comparator that determines the order of the managed elements.
+		 * @return a new queue for the given key-group.
+		 */
+		@Nonnull
+		PQS create(@Nonnegative int keyGroupId, @Nonnegative int numKeyGroups, @Nonnull Comparator<T> elementComparator);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
new file mode 100644
index 0000000..0e7d9dd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.util.Preconditions;
+
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Comparator;
+
+/**
+ * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetCache} based
+ * on an AVL-Tree. We chose the implementation from fastutil over JDK for performance reasons.
+ *
+ * <p>Maintainer notes: We can consider the following potential performance improvements. First, we could introduce a
+ * bulk-load method to OrderedSetCache to exploit the fact that adding from an OrderedSetStore is already happening in
+ * sorted order, e.g. there are more efficient ways to construct search trees from sorted elements. Second, we could
+ * replace the internal AVL-Tree with an extended variant of {@link HeapPriorityQueueSet} that is organized as a
+ * Min-Max-Heap.
+ *
+ * @param <E> type of the contained elements.
+ */
+public class TreeOrderedSetCache<E> implements CachingInternalPriorityQueueSet.OrderedSetCache<E> {
+
+	/** The tree is used to store cached elements. */
+	@Nonnull
+	private final ObjectAVLTreeSet<E> avlTree;
+
+	/** The element comparator. */
+	@Nonnull
+	private final Comparator<E> elementComparator;
+
+	/** The maximum capacity of the cache. */
+	@Nonnegative
+	private final int capacity;
+
+	/**
+	 * Creates a new {@link TreeOrderedSetCache} with the given capacity and element comparator. Capacity must be > 0.
+	 * @param elementComparator comparator for the cached elements.
+	 * @param capacity the capacity of the cache. Must be > 0.
+	 */
+	public TreeOrderedSetCache(@Nonnull Comparator<E> elementComparator, @Nonnegative int capacity) {
+		Preconditions.checkArgument(capacity > 0, "Cache capacity must be greater than 0.");
+		this.avlTree = new ObjectAVLTreeSet<>(elementComparator);
+		this.elementComparator = elementComparator;
+		this.capacity = capacity;
+	}
+
+	@Override
+	public void add(@Nonnull E element) {
+		assert !isFull();
+		avlTree.add(element);
+	}
+
+	@Override
+	public void remove(@Nonnull E element) {
+		avlTree.remove(element);
+	}
+
+	@Override
+	public boolean isFull() {
+		return avlTree.size() == capacity;
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return avlTree.isEmpty();
+	}
+
+	@Override
+	public boolean isInLowerBound(@Nonnull E toCheck) {
+		return avlTree.isEmpty() || elementComparator.compare(peekLast(), toCheck) > 0;
+	}
+
+	@Nullable
+	@Override
+	public E removeFirst() {
+		if (avlTree.isEmpty()) {
+			return null;
+		}
+		final E first = avlTree.first();
+		avlTree.remove(first);
+		return first;
+	}
+
+	@Nullable
+	@Override
+	public E removeLast() {
+		if (avlTree.isEmpty()) {
+			return null;
+		}
+		final E last = avlTree.last();
+		avlTree.remove(last);
+		return last;
+	}
+
+	@Nullable
+	@Override
+	public E peekFirst() {
+		return !avlTree.isEmpty() ? avlTree.first() : null;
+	}
+
+	@Nullable
+	@Override
+	public E peekLast() {
+		return !avlTree.isEmpty() ? avlTree.last() : null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
new file mode 100644
index 0000000..c0c3ba4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+	protected static void insertRandomElements(
+		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+		@Nonnull Set<TestElement> checkSet,
+		int count) {
+
+		ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+		final int numUniqueKeys = Math.max(count / 4, 64);
+
+		long duplicatePriority = Long.MIN_VALUE;
+
+		final boolean checkEndSizes = priorityQueue.isEmpty();
+
+		for (int i = 0; i < count; ++i) {
+			TestElement element;
+			do {
+				long elementPriority;
+				if (duplicatePriority == Long.MIN_VALUE) {
+					elementPriority = localRandom.nextLong();
+				} else {
+					elementPriority = duplicatePriority;
+					duplicatePriority = Long.MIN_VALUE;
+				}
+				element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+			} while (!checkSet.add(element));
+
+			if (localRandom.nextInt(10) == 0) {
+				duplicatePriority = element.getPriority();
+			}
+
+			final boolean headChangedIndicated = priorityQueue.add(element);
+			if (element.equals(priorityQueue.peek())) {
+				Assert.assertTrue(headChangedIndicated);
+			}
+		}
+
+		if (checkEndSizes) {
+			Assert.assertEquals(count, priorityQueue.size());
+		}
+	}
+
+	@Test
+	public void testPeekPollOrder() {
+		final int initialCapacity = 4;
+		final int testSize = 1000;
+		InternalPriorityQueue<TestElement> priorityQueue =
+			newPriorityQueue(initialCapacity);
+		HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+		insertRandomElements(priorityQueue, checkSet, testSize);
+
+		long lastPriorityValue = Long.MIN_VALUE;
+		int lastSize = priorityQueue.size();
+		Assert.assertEquals(testSize, lastSize);
+		TestElement testElement;
+		while ((testElement = priorityQueue.peek()) != null) {
+			Assert.assertFalse(priorityQueue.isEmpty());
+			Assert.assertEquals(lastSize, priorityQueue.size());
+			Assert.assertEquals(testElement, priorityQueue.poll());
+			Assert.assertTrue(checkSet.remove(testElement));
+			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+			lastPriorityValue = testElement.getPriority();
+			--lastSize;
+		}
+
+		Assert.assertTrue(priorityQueue.isEmpty());
+		Assert.assertEquals(0, priorityQueue.size());
+		Assert.assertEquals(0, checkSet.size());
+	}
+
+	@Test
+	public void testRemoveInsertMixKeepsOrder() {
+
+		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+		final ThreadLocalRandom random = ThreadLocalRandom.current();
+		final int testSize = 300;
+		final int addCounterMax = testSize / 4;
+		int iterationsTillNextAdds = random.nextInt(addCounterMax);
+		HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+		insertRandomElements(priorityQueue, checkSet, testSize);
+
+		// check that the whole set is still in order
+		while (!checkSet.isEmpty()) {
+
+			Iterator<TestElement> iterator = checkSet.iterator();
+			TestElement element = iterator.next();
+			iterator.remove();
+
+			final boolean removesHead = element.equals(priorityQueue.peek());
+
+			if (removesHead) {
+				Assert.assertTrue(priorityQueue.remove(element));
+			} else {
+				priorityQueue.remove(element);
+			}
+
+			long lastPriorityValue = removesHead ? element.getPriority() : Long.MIN_VALUE;
+
+			while ((element = priorityQueue.poll()) != null) {
+				Assert.assertTrue(element.getPriority() >= lastPriorityValue);
+				lastPriorityValue = element.getPriority();
+				if (--iterationsTillNextAdds == 0) {
+					// some random adds
+					iterationsTillNextAdds = random.nextInt(addCounterMax);
+					insertRandomElements(priorityQueue, new HashSet<>(checkSet), 1 + random.nextInt(3));
+					lastPriorityValue = priorityQueue.peek().getPriority();
+				}
+			}
+
+			Assert.assertTrue(priorityQueue.isEmpty());
+
+			priorityQueue.addAll(checkSet);
+		}
+	}
+
+	@Test
+	public void testPoll() {
+		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+		Assert.assertNull(priorityQueue.poll());
+
+		final int testSize = 345;
+		HashSet<TestElement> checkSet = new HashSet<>(testSize);
+		insertRandomElements(priorityQueue, checkSet, testSize);
+
+		long lastPriorityValue = Long.MIN_VALUE;
+		while (!priorityQueue.isEmpty()) {
+			TestElement removed = priorityQueue.poll();
+			Assert.assertNotNull(removed);
+			Assert.assertTrue(checkSet.remove(removed));
+			Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
+			lastPriorityValue = removed.getPriority();
+		}
+		Assert.assertTrue(checkSet.isEmpty());
+
+		Assert.assertNull(priorityQueue.poll());
+	}
+
+	@Test
+	public void testIsEmpty() {
+		InternalPriorityQueue<TestElement> priorityQueue =
+			newPriorityQueue(1);
+
+		Assert.assertTrue(priorityQueue.isEmpty());
+
+		Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
+		Assert.assertFalse(priorityQueue.isEmpty());
+
+		priorityQueue.poll();
+		Assert.assertTrue(priorityQueue.isEmpty());
+	}
+
+	@Test
+	public void testBulkAddRestoredElements() throws Exception {
+		final int testSize = 10;
+		HashSet<TestElement> elementSet = new HashSet<>(testSize);
+		for (int i = 0; i < testSize; ++i) {
+			elementSet.add(new TestElement(i, i));
+		}
+
+		List<TestElement> twoTimesElementSet = new ArrayList<>(elementSet.size() * 2);
+
+		for (TestElement testElement : elementSet) {
+			twoTimesElementSet.add(testElement.deepCopy());
+			twoTimesElementSet.add(testElement.deepCopy());
+		}
+
+		InternalPriorityQueue<TestElement> priorityQueue =
+			newPriorityQueue(1);
+
+		priorityQueue.addAll(twoTimesElementSet);
+		priorityQueue.addAll(elementSet);
+
+		final int expectedSize = testSetSemanticsAgainstDuplicateElements() ? elementSet.size() : 3 * elementSet.size();
+
+		Assert.assertEquals(expectedSize, priorityQueue.size());
+		try (final CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+			while (iterator.hasNext()) {
+				if (testSetSemanticsAgainstDuplicateElements()) {
+					Assert.assertTrue(elementSet.remove(iterator.next()));
+				} else {
+					Assert.assertTrue(elementSet.contains(iterator.next()));
+				}
+			}
+		}
+		if (testSetSemanticsAgainstDuplicateElements()) {
+			Assert.assertTrue(elementSet.isEmpty());
+		}
+	}
+
+	@Test
+	public void testIterator() throws Exception {
+		InternalPriorityQueue<TestElement> priorityQueue =
+			newPriorityQueue(1);
+
+		// test empty iterator
+		try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+			Assert.assertFalse(iterator.hasNext());
+			try {
+				iterator.next();
+				Assert.fail();
+			} catch (NoSuchElementException ignore) {
+			}
+		}
+
+		// iterate some data
+		final int testSize = 10;
+		HashSet<TestElement> checkSet = new HashSet<>(testSize);
+		insertRandomElements(priorityQueue, checkSet, testSize);
+		try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+			while (iterator.hasNext()) {
+				Assert.assertTrue(checkSet.remove(iterator.next()));
+			}
+			Assert.assertTrue(checkSet.isEmpty());
+		}
+	}
+
+	@Test
+	public void testAdd() {
+		InternalPriorityQueue<TestElement> priorityQueue =
+			newPriorityQueue(1);
+
+		TestElement lowPrioElement = new TestElement(4711L, 42L);
+		TestElement highPrioElement = new TestElement(815L, 23L);
+		Assert.assertTrue(priorityQueue.add(lowPrioElement));
+		if (testSetSemanticsAgainstDuplicateElements()) {
+			priorityQueue.add(lowPrioElement.deepCopy());
+		}
+		Assert.assertEquals(1, priorityQueue.size());
+		Assert.assertTrue(priorityQueue.add(highPrioElement));
+		Assert.assertEquals(2, priorityQueue.size());
+		Assert.assertEquals(highPrioElement, priorityQueue.poll());
+		Assert.assertEquals(1, priorityQueue.size());
+		Assert.assertEquals(lowPrioElement, priorityQueue.poll());
+		Assert.assertEquals(0, priorityQueue.size());
+	}
+
+	@Test
+	public void testRemove() {
+		InternalPriorityQueue<TestElement> priorityQueue =
+			newPriorityQueue(1);
+
+		final long key = 4711L;
+		final long priorityValue = 42L;
+		final TestElement testElement = new TestElement(key, priorityValue);
+		if (testSetSemanticsAgainstDuplicateElements()) {
+			Assert.assertFalse(priorityQueue.remove(testElement));
+		}
+		Assert.assertTrue(priorityQueue.add(testElement));
+		Assert.assertTrue(priorityQueue.remove(testElement));
+		if (testSetSemanticsAgainstDuplicateElements()) {
+			Assert.assertFalse(priorityQueue.remove(testElement));
+		}
+		Assert.assertTrue(priorityQueue.isEmpty());
+	}
+
+	protected abstract InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity);
+
+	protected abstract boolean testSetSemanticsAgainstDuplicateElements();
+
+	/**
+	 * Payload for usage in the test.
+	 */
+	protected static class TestElement implements HeapPriorityQueueElement {
+
+		private final long key;
+		private final long priority;
+		private int internalIndex;
+
+		public TestElement(long key, long priority) {
+			this.key = key;
+			this.priority = priority;
+			this.internalIndex = NOT_CONTAINED;
+		}
+
+		public long getKey() {
+			return key;
+		}
+
+		public long getPriority() {
+			return priority;
+		}
+
+		@Override
+		public int getInternalIndex() {
+			return internalIndex;
+		}
+
+		@Override
+		public void setInternalIndex(int newIndex) {
+			internalIndex = newIndex;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			TestElement that = (TestElement) o;
+			return getKey() == that.getKey() &&
+				getPriority() == that.getPriority();
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(getKey(), getPriority());
+		}
+
+		public TestElement deepCopy() {
+			return new TestElement(key, priority);
+		}
+	}
+
+	/**
+	 * Serializer for {@link TestElement}. The serialization format produced by this serializer allows lexicographic
+	 * ordering by {@link TestElement#getPriority}.
+	 */
+	protected static class TestElementSerializer extends TypeSerializer<TestElement> {
+
+		public static final TestElementSerializer INSTANCE = new TestElementSerializer();
+
+		private TestElementSerializer() {
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public TypeSerializer<TestElement> duplicate() {
+			return this;
+		}
+
+		@Override
+		public TestElement createInstance() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public TestElement copy(TestElement from) {
+			return new TestElement(from.key, from.priority);
+		}
+
+		@Override
+		public TestElement copy(TestElement from, TestElement reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return 2 * Long.BYTES;
+		}
+
+		@Override
+		public void serialize(TestElement record, DataOutputView target) throws IOException {
+			// serialize priority first, so that we have correct order in RocksDB. We flip the sign bit for correct
+			// lexicographic order.
+			target.writeLong(MathUtils.flipSignBit(record.getPriority()));
+			target.writeLong(record.getKey());
+		}
+
+		@Override
+		public TestElement deserialize(DataInputView source) throws IOException {
+			long prio = MathUtils.flipSignBit(source.readLong());
+			long key = source.readLong();
+			return new TestElement(key, prio);
+		}
+
+		@Override
+		public TestElement deserialize(TestElement reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			serialize(deserialize(source), target);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return false;
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return 4711;
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public CompatibilityResult<TestElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			throw new UnsupportedOperationException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
index 9298743..e6b7739 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
@@ -46,14 +46,14 @@ public abstract class KeyGroupPartitionerTestBase<T> extends TestLogger {
 		new DataOutputViewStreamWrapper(new ByteArrayOutputStreamWithPos(0));
 
 	@Nonnull
-	protected final KeyGroupPartitioner.KeyExtractorFunction<T> keyExtractorFunction;
+	protected final KeyExtractorFunction<T> keyExtractorFunction;
 
 	@Nonnull
 	protected final Function<Random, T> elementGenerator;
 
 	protected KeyGroupPartitionerTestBase(
 		@Nonnull Function<Random, T> elementGenerator,
-		@Nonnull KeyGroupPartitioner.KeyExtractorFunction<T> keyExtractorFunction) {
+		@Nonnull KeyExtractorFunction<T> keyExtractorFunction) {
 
 		this.elementGenerator = elementGenerator;
 		this.keyExtractorFunction = keyExtractorFunction;
@@ -138,7 +138,7 @@ public abstract class KeyGroupPartitionerTestBase<T> extends TestLogger {
 	static final class ValidatingElementWriterDummy<T> implements KeyGroupPartitioner.ElementWriterFunction<T> {
 
 		@Nonnull
-		private final KeyGroupPartitioner.KeyExtractorFunction<T> keyExtractorFunction;
+		private final KeyExtractorFunction<T> keyExtractorFunction;
 		@Nonnegative
 		private final int numberOfKeyGroups;
 		@Nonnull
@@ -147,7 +147,7 @@ public abstract class KeyGroupPartitionerTestBase<T> extends TestLogger {
 		private int currentKeyGroup;
 
 		ValidatingElementWriterDummy(
-			@Nonnull KeyGroupPartitioner.KeyExtractorFunction<T> keyExtractorFunction,
+			@Nonnull KeyExtractorFunction<T> keyExtractorFunction,
 			@Nonnegative int numberOfKeyGroups,
 			@Nonnull Set<T> allElementsSet) {
 			this.keyExtractorFunction = keyExtractorFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSetTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSetTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSetTestBase.java
new file mode 100644
index 0000000..79fd556
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSetTestBase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
+
+/**
+ * Test for {@link CachingInternalPriorityQueueSet}.
+ */
+public abstract class CachingInternalPriorityQueueSetTestBase extends InternalPriorityQueueTestBase {
+
+	@Override
+	protected InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
+		final CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> cache = createOrderedSetCache();
+		final CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> store = createOrderedSetStore();
+		return new CachingInternalPriorityQueueSet<>(cache, store);
+	}
+
+	@Override
+	protected boolean testSetSemanticsAgainstDuplicateElements() {
+		return true;
+	}
+
+	protected abstract CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> createOrderedSetStore();
+
+	protected abstract CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> createOrderedSetCache();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
new file mode 100644
index 0000000..618da4e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+/**
+ * Test for {@link HeapPriorityQueueSet}.
+ */
+public class HeapPriorityQueueSetTest extends HeapPriorityQueueTest {
+
+	@Override
+	protected HeapPriorityQueueSet<TestElement> newPriorityQueue(int initialCapacity) {
+		return new HeapPriorityQueueSet<>(
+			TEST_ELEMENT_COMPARATOR,
+			KEY_EXTRACTOR_FUNCTION,
+			initialCapacity,
+			KEY_GROUP_RANGE,
+			KEY_GROUP_RANGE.getNumberOfKeyGroups());
+	}
+
+	@Override
+	protected boolean testSetSemanticsAgainstDuplicateElements() {
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
new file mode 100644
index 0000000..8ffb8b8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+/**
+ * Test for {@link HeapPriorityQueue}.
+ */
+public class HeapPriorityQueueTest extends InternalPriorityQueueTestBase {
+
+	@Test
+	public void testClear() {
+		HeapPriorityQueue<TestElement> priorityQueueSet =
+			newPriorityQueue(1);
+
+		int count = 10;
+		HashSet<TestElement> checkSet = new HashSet<>(count);
+		insertRandomElements(priorityQueueSet, checkSet, count);
+		Assert.assertEquals(count, priorityQueueSet.size());
+		priorityQueueSet.clear();
+		Assert.assertEquals(0, priorityQueueSet.size());
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testToArray() {
+
+		final int testSize = 10;
+
+		List<TestElement[]> tests = new ArrayList<>(2);
+		tests.add(new TestElement[0]);
+		tests.add(new TestElement[testSize]);
+		tests.add(new TestElement[testSize + 1]);
+
+		for (TestElement[] testArray : tests) {
+
+			Arrays.fill(testArray, new TestElement(42L, 4711L));
+
+			HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+			HeapPriorityQueue<TestElement> timerPriorityQueue =
+				newPriorityQueue(1);
+
+			Assert.assertEquals(testArray.length, timerPriorityQueue.toArray(testArray).length);
+
+			insertRandomElements(timerPriorityQueue, checkSet, testSize);
+
+			TestElement[] toArray = timerPriorityQueue.toArray(testArray);
+
+			Assert.assertEquals((testArray.length >= testSize), (testArray == toArray));
+
+			int count = 0;
+			for (TestElement o : toArray) {
+				if (o == null) {
+					break;
+				}
+				Assert.assertTrue(checkSet.remove(o));
+				++count;
+			}
+
+			Assert.assertEquals(timerPriorityQueue.size(), count);
+			Assert.assertTrue(checkSet.isEmpty());
+		}
+	}
+
+	@Override
+	protected HeapPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
+		return new HeapPriorityQueue<>(TEST_ELEMENT_COMPARATOR, initialCapacity);
+	}
+
+	@Override
+	protected boolean testSetSemanticsAgainstDuplicateElements() {
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
new file mode 100644
index 0000000..277de19
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
+
+/**
+ * Test for {@link KeyGroupPartitionedPriorityQueue}.
+ */
+public class KeyGroupPartitionedPriorityQueueTest extends InternalPriorityQueueTestBase {
+
+	@Override
+	protected InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
+		return new KeyGroupPartitionedPriorityQueue<>(
+			KEY_EXTRACTOR_FUNCTION,
+			TEST_ELEMENT_COMPARATOR,
+			newFactory(initialCapacity),
+			KEY_GROUP_RANGE, KEY_GROUP_RANGE.getNumberOfKeyGroups());
+	}
+
+	protected KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<
+			TestElement, CachingInternalPriorityQueueSet<TestElement>> newFactory(int initialCapacity) {
+
+		return (keyGroupId, numKeyGroups, elementComparator) -> {
+			CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> cache =
+				new TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 32);
+			CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> store =
+				new TestOrderedStore<>(TEST_ELEMENT_COMPARATOR);
+			return new CachingInternalPriorityQueueSet<>(cache, store);
+		};
+	}
+
+	@Override
+	protected boolean testSetSemanticsAgainstDuplicateElements() {
+		return true;
+	}
+}