You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/03 17:32:52 UTC
[3/4] flink git commit: [FLINK-9491][state] Generalization of timer
queue (decoupled from timers) and second implementation based on RocksDB
[FLINK-9491][state] Generalization of timer queue (decoupled from timers) and second implementation based on RocksDB
This closes #6228.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6ad421e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6ad421e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6ad421e
Branch: refs/heads/master
Commit: c6ad421e2b83fe8dc6031ce9716285fdf941d5e0
Parents: 8cb89f9
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Jun 18 14:38:01 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jul 3 19:32:08 2018 +0200
----------------------------------------------------------------------
.../apache/flink/util/CloseableIterator.java | 86 ++++
.../java/org/apache/flink/util/MathUtils.java | 10 +
.../org/apache/flink/util/MathUtilTest.java | 11 +
flink-runtime/pom.xml | 7 +-
.../runtime/state/InternalPriorityQueue.java | 99 ++++
.../runtime/state/KeyExtractorFunction.java | 36 ++
.../runtime/state/KeyGroupPartitioner.java | 13 -
.../heap/CachingInternalPriorityQueueSet.java | 288 ++++++++++++
.../runtime/state/heap/HeapPriorityQueue.java | 343 ++++++++++++++
.../state/heap/HeapPriorityQueueElement.java | 49 ++
.../state/heap/HeapPriorityQueueSet.java | 185 ++++++++
.../heap/KeyGroupPartitionedPriorityQueue.java | 280 +++++++++++
.../runtime/state/heap/TreeOrderedSetCache.java | 128 +++++
.../state/InternalPriorityQueueTestBase.java | 466 +++++++++++++++++++
.../state/KeyGroupPartitionerTestBase.java | 8 +-
...CachingInternalPriorityQueueSetTestBase.java | 43 ++
.../state/heap/HeapPriorityQueueSetTest.java | 39 ++
.../state/heap/HeapPriorityQueueTest.java | 99 ++++
.../KeyGroupPartitionedPriorityQueueTest.java | 53 +++
.../state/heap/OrderedSetCacheTestBase.java | 102 ++++
...mpleCachingInternalPriorityQueueSetTest.java | 35 ++
.../runtime/state/heap/TestOrderedStore.java | 60 +++
.../state/heap/TreeOrderedSetCacheTest.java | 30 ++
.../state/RocksDBKeySerializationUtils.java | 4 +
.../state/RocksDBKeyedStateBackend.java | 3 +-
.../streaming/state/RocksDBOrderedSetStore.java | 278 +++++++++++
...nalPriorityQueueSetWithRocksDBStoreTest.java | 66 +++
...tionedPriorityQueueWithRocksDBStoreTest.java | 53 +++
.../state/RocksDBOrderedSetStoreTest.java | 137 ++++++
.../streaming/state/RocksDBResource.java | 159 +++++++
.../api/operators/HeapInternalTimerService.java | 82 ++--
.../HeapPriorityQueueStateSnapshot.java | 112 +++++
.../api/operators/InternalTimerHeap.java | 413 ----------------
.../operators/InternalTimerHeapSnapshot.java | 97 ----
.../api/operators/InternalTimersSnapshot.java | 16 +-
.../InternalTimersSnapshotReaderWriters.java | 16 +-
.../api/operators/TimerHeapInternalTimer.java | 70 ++-
.../operators/HeapInternalTimerServiceTest.java | 14 +-
.../api/operators/InternalTimerHeapTest.java | 301 ------------
39 files changed, 3383 insertions(+), 908 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
new file mode 100644
index 0000000..09ea046
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.Consumer;
+
+/**
+ * This interface represents an {@link Iterator} that is also {@link AutoCloseable}. A typical use-case for this
+ * interface are iterators that are based on native-resources such as files, network, or database connections. Clients
+ * must call {@link #close()} after using the iterator.
+ *
+ * @param <T> the type of iterated elements.
+ */
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
+
+ CloseableIterator<?> EMPTY_INSTANCE = CloseableIterator.adapterForIterator(Collections.emptyIterator());
+
+ @Nonnull
+ static <T> CloseableIterator<T> adapterForIterator(@Nonnull Iterator<T> iterator) {
+ return new IteratorAdapter<>(iterator);
+ }
+
+ @SuppressWarnings("unchecked")
+ static <T> CloseableIterator<T> empty() {
+ return (CloseableIterator<T>) EMPTY_INSTANCE;
+ }
+
+ /**
+ * Adapter from {@link Iterator} to {@link CloseableIterator}. Does nothing on {@link #close()}.
+ *
+ * @param <E> the type of iterated elements.
+ */
+ final class IteratorAdapter<E> implements CloseableIterator<E> {
+
+ @Nonnull
+ private final Iterator<E> delegate;
+
+ IteratorAdapter(@Nonnull Iterator<E> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public E next() {
+ return delegate.next();
+ }
+
+ @Override
+ public void remove() {
+ delegate.remove();
+ }
+
+ @Override
+ public void forEachRemaining(Consumer<? super E> action) {
+ delegate.forEachRemaining(action);
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index 48c32a3..96fc3c4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -197,6 +197,16 @@ public final class MathUtils {
return in;
}
+ /**
+ * Flips the sign bit (most-significant-bit) of the input.
+ *
+ * @param in the input value.
+ * @return the input with a flipped sign bit (most-significant-bit).
+ */
+ public static long flipSignBit(long in) {
+ return in ^ Long.MIN_VALUE;
+ }
+
// ============================================================================================
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
index 82c17e9..17a915c 100644
--- a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.util;
+import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -131,4 +132,14 @@ public class MathUtilTest {
assertFalse(MathUtils.isPowerOf2(Integer.MAX_VALUE));
assertFalse(MathUtils.isPowerOf2(Long.MAX_VALUE));
}
+
+ @Test
+ public void testFlipSignBit() {
+ Assert.assertEquals(0L, MathUtils.flipSignBit(Long.MIN_VALUE));
+ Assert.assertEquals(Long.MIN_VALUE, MathUtils.flipSignBit(0L));
+ Assert.assertEquals(-1L, MathUtils.flipSignBit(Long.MAX_VALUE));
+ Assert.assertEquals(Long.MAX_VALUE, MathUtils.flipSignBit(-1L));
+ Assert.assertEquals(42L | Long.MIN_VALUE, MathUtils.flipSignBit(42L));
+ Assert.assertEquals(-42L & Long.MAX_VALUE, MathUtils.flipSignBit(-42L));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 9a4034f..cfc57c0 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -116,6 +116,12 @@ under the License.
</dependency>
<dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ <version>8.2.1</version>
+ </dependency>
+
+ <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
@@ -306,7 +312,6 @@ under the License.
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
</dependency>
-
</dependencies>
<!-- Dependency Management to converge transitive dependency versions -->
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
new file mode 100644
index 0000000..fb3ee82
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * Interface for collection that gives in order access to elements w.r.t their priority.
+ *
+ * @param <T> type of elements in the ordered set.
+ */
+@Internal
+public interface InternalPriorityQueue<T> {
+
+ /**
+ * Retrieves and removes the first element (w.r.t. the order) of this set,
+ * or returns {@code null} if this set is empty.
+ *
+ * @return the first element of this ordered set, or {@code null} if this set is empty.
+ */
+ @Nullable
+ T poll();
+
+ /**
+ * Retrieves, but does not remove, the element (w.r.t. order) of this set,
+ * or returns {@code null} if this set is empty.
+ *
+ * @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty.
+ */
+ @Nullable
+ T peek();
+
+ /**
+ * Adds the given element to the set, if it is not already contained.
+ *
+ * @param toAdd the element to add to the set.
+ * @return <code>true</code> if the operation changed the head element or if it is unclear if the head element changed.
+ * Only returns <code>false</code> if the head element was not changed by this operation.
+ */
+ boolean add(@Nonnull T toAdd);
+
+ /**
+ * Removes the given element from the set, if is contained in the set.
+ *
+ * @param toRemove the element to remove.
+ * @return <code>true</code> if the operation changed the head element or if it is unclear if the head element changed.
+ * Only returns <code>false</code> if the head element was not changed by this operation.
+ */
+ boolean remove(@Nonnull T toRemove);
+
+ /**
+ * Check if the set contains any elements.
+ *
+ * @return true if the set is empty, i.e. no element is contained.
+ */
+ boolean isEmpty();
+
+ /**
+ * Returns the number of elements in this set.
+ *
+ * @return the number of elements in this set.
+ */
+ @Nonnegative
+ int size();
+
+ /**
+ * Adds all the given elements to the set.
+ */
+ void addAll(@Nullable Collection<? extends T> toAdd);
+
+ /**
+ * Iterator over all elements, no order guaranteed. Iterator must be closed after usage.
+ */
+ @Nonnull
+ CloseableIterator<T> iterator();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
new file mode 100644
index 0000000..a3ce11c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Function to extract a key from a given object.
+ *
+ * @param <T> type of the element from which we extract the key.
+ */
+@FunctionalInterface
+public interface KeyExtractorFunction<T> {
+
+ /**
+ * Returns the key for the given element by which the key-group can be computed.
+ */
+ @Nonnull
+ Object extractKeyFromElement(@Nonnull T element);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
index 673a0ef..6a9dfb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
@@ -265,19 +265,6 @@ public class KeyGroupPartitioner<T> {
}
/**
- * @param <T> type of the element from which we extract the key.
- */
- @FunctionalInterface
- public interface KeyExtractorFunction<T> {
-
- /**
- * Returns the key for the given element by which the key-group can be computed.
- */
- @Nonnull
- Object extractKeyFromElement(@Nonnull T element);
- }
-
- /**
* This functional interface defines how one element is written to a {@link DataOutputView}.
*
* @param <T> type of the written elements.
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
new file mode 100644
index 0000000..771315d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
+ * two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth
+ * about the elements in this queue. The second storage is a (fast) ordered set cache, typically with some limited
+ * capacity. The cache is used to improve performance of accesses to the underlying store and contains an ordered
+ * (partial) view on the top elements in the ordered store. We are currently applying simple write-through to keep cache
+ * and store in sync on updates and refill the cache from the store when it is empty and we expect more elements
+ * contained in the store.
+ *
+ * @param <E> the type if the managed elements.
+ */
+public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue<E>, HeapPriorityQueueElement {
+
+ /** A ordered set cache that contains a (partial) view on the top elements in the ordered store. */
+ @Nonnull
+ private final OrderedSetCache<E> orderedCache;
+
+ /** A store with ordered set semantics that contains the ground truth of all inserted elements. */
+ @Nonnull
+ private final OrderedSetStore<E> orderedStore;
+
+ /** This flag is true if there could be elements in the backend that are not in the cache (false positives ok). */
+ private boolean storeOnlyElements;
+
+ /** Management data for the {@link HeapPriorityQueueElement} trait. */
+ private int pqManagedIndex;
+
+ @SuppressWarnings("unchecked")
+ public CachingInternalPriorityQueueSet(
+ @Nonnull OrderedSetCache<E> orderedCache,
+ @Nonnull OrderedSetStore<E> orderedStore) {
+ this.orderedCache = orderedCache;
+ this.orderedStore = orderedStore;
+ // We are careful and set this to true. Could be set to false if we know for sure that the store is empty, but
+ // checking this could be an expensive operation.
+ this.storeOnlyElements = true;
+ this.pqManagedIndex = HeapPriorityQueueElement.NOT_CONTAINED;
+ }
+
+ @Nullable
+ @Override
+ public E peek() {
+
+ checkRefillCacheFromStore();
+
+ return orderedCache.peekFirst();
+ }
+
+ @Nullable
+ @Override
+ public E poll() {
+
+ checkRefillCacheFromStore();
+
+ final E first = orderedCache.removeFirst();
+
+ if (first != null) {
+ // write-through sync
+ orderedStore.remove(first);
+ }
+
+ return first;
+ }
+
+ @Override
+ public boolean add(@Nonnull E toAdd) {
+
+ checkRefillCacheFromStore();
+
+ // write-through sync
+ orderedStore.add(toAdd);
+
+ final boolean cacheFull = orderedCache.isFull();
+
+ if ((!cacheFull && !storeOnlyElements) || orderedCache.isInLowerBound(toAdd)) {
+
+ if (cacheFull) {
+ // we drop the element with lowest priority from the cache
+ orderedCache.removeLast();
+ // the dropped element is now only in the store
+ storeOnlyElements = true;
+ }
+
+ orderedCache.add(toAdd);
+ return toAdd.equals(orderedCache.peekFirst());
+ } else {
+ // we only added to the store
+ storeOnlyElements = true;
+ return false;
+ }
+ }
+
+ @Override
+ public boolean remove(@Nonnull E toRemove) {
+
+ checkRefillCacheFromStore();
+
+ boolean newHead = toRemove.equals(orderedCache.peekFirst());
+ // write-through sync
+ orderedStore.remove(toRemove);
+ orderedCache.remove(toRemove);
+ return newHead;
+ }
+
+ @Override
+ public void addAll(@Nullable Collection<? extends E> toAdd) {
+
+ if (toAdd == null) {
+ return;
+ }
+
+ for (E element : toAdd) {
+ add(element);
+ }
+ }
+
+ @Override
+ public int size() {
+ return orderedStore.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ checkRefillCacheFromStore();
+ return orderedCache.isEmpty();
+ }
+
+ @Nonnull
+ @Override
+ public CloseableIterator<E> iterator() {
+ return orderedStore.orderedIterator();
+ }
+
+ @Override
+ public int getInternalIndex() {
+ return pqManagedIndex;
+ }
+
+ @Override
+ public void setInternalIndex(int updateIndex) {
+ this.pqManagedIndex = updateIndex;
+ }
+
+ /**
+ * Refills the cache from the store when the cache is empty and we expect more elements in the store.
+ *
+ * TODO: We can think about exploiting the property that the store is already sorted when bulk-filling the cache.
+ */
+ private void checkRefillCacheFromStore() {
+ if (storeOnlyElements && orderedCache.isEmpty()) {
+ try (final CloseableIterator<E> iterator = orderedStore.orderedIterator()) {
+ while (iterator.hasNext() && !orderedCache.isFull()) {
+ orderedCache.add(iterator.next());
+ }
+ storeOnlyElements = iterator.hasNext();
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Exception while closing RocksDB iterator.", e);
+ }
+ }
+ }
+
+ /**
+ * Interface for an ordered cache with set semantics of elements to be used in
+ * {@link CachingInternalPriorityQueueSet}. Cache implementations typically have a limited capacity as indicated
+ * via the {@link #isFull()} method.
+ *
+ * @param <E> the type of contained elements.
+ */
+ public interface OrderedSetCache<E> {
+
+ /**
+ * Adds the given element to the cache (if not yet contained). This method should only be called if the cache
+ * is not full.
+ * @param element element to add to the cache.
+ */
+ void add(@Nonnull E element);
+
+ /**
+ * Removes the given element from the cache (if contained).
+ * @param element element to remove from the cache.
+ */
+ void remove(@Nonnull E element);
+
+ /**
+ * Returns <code>true</code> if the cache is full and no more elements can be added.
+ */
+ boolean isFull();
+
+ /**
+ * Returns <code>true</code> if the cache is empty, i.e. contains ne elements.
+ */
+ boolean isEmpty();
+
+ /**
+ * Returns true, if the element is compares smaller than the currently largest element in the cache.
+ */
+ boolean isInLowerBound(@Nonnull E toCheck);
+
+ /**
+ * Removes and returns the first (smallest) element from the cache.
+ */
+ @Nullable
+ E removeFirst();
+
+ /**
+ * Removes and returns the last (larges) element from the cache.
+ */
+ @Nullable
+ E removeLast();
+
+ /**
+ * Returns the first (smallest) element from the cache (without removing it).
+ */
+ @Nullable
+ E peekFirst();
+
+ /**
+ * Returns the last (larges) element from the cache (without removing it).
+ */
+ @Nullable
+ E peekLast();
+ }
+
+ /**
+ * Interface for an ordered store with set semantics of elements to be used in
+ * {@link CachingInternalPriorityQueueSet}. Stores are assumed to have (practically) unlimited capacity, but their
+ * operations could all be expensive.
+ *
+ * @param <E> the type of contained elements.
+ */
+ public interface OrderedSetStore<E> {
+
+ /**
+ * Adds the given element to the store (if not yet contained).
+ * @param element element to add to the store.
+ */
+ void add(@Nonnull E element);
+
+ /**
+ * Removed the given element from the cache (if contained).
+ * @param element element to remove from the cache.
+ */
+ void remove(@Nonnull E element);
+
+ /**
+ * Returns the number of elements in the store.
+ */
+ @Nonnegative
+ int size();
+
+ /**
+ * Returns an iterator over the store that returns element in order. The iterator must be closed by the client
+ * after usage.
+ */
+ @Nonnull
+ CloseableIterator<E> orderedIterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
new file mode 100644
index 0000000..7017905
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
+ * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
+ * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
+ * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
+ * not on equals. We use the managed index from {@link HeapPriorityQueueElement} to find an element in the queue
+ * array to support fast deletes.
+ *
+ * <p>Possible future improvements:
+ * <ul>
+ * <li>We could also implement shrinking for the heap.</li>
+ * </ul>
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
+
+ /**
+ * The index of the head element in the array that represents the heap.
+ */
+ private static final int QUEUE_HEAD_INDEX = 1;
+
+ /**
+ * Comparator for the contained elements.
+ */
+ private final Comparator<T> elementComparator;
+
+ /**
+ * The array that represents the heap-organized priority queue.
+ */
+ private T[] queue;
+
+ /**
+ * The current size of the priority queue.
+ */
+ private int size;
+
+ /**
+ * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
+ *
+ * @param elementComparator comparator for the contained elements.
+ * @param minimumCapacity the minimum and initial capacity of this priority queue.
+ */
+ @SuppressWarnings("unchecked")
+ public HeapPriorityQueue(
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnegative int minimumCapacity) {
+
+ this.elementComparator = elementComparator;
+ this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
+ }
+
+ @Override
+ @Nullable
+ public T poll() {
+ return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
+ }
+
+ @Override
+ @Nullable
+ public T peek() {
+ return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
+ }
+
+ /**
+ * Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean add(@Nonnull T toAdd) {
+ return addInternal(toAdd);
+ }
+
+ /**
+ * This remove is based on object identity, not the result of equals. We use the objects managed index to find
+ * the instance in the queue array.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean remove(@Nonnull T toRemove) {
+ return removeInternal(toRemove);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ @Override
+ @Nonnegative
+ public int size() {
+ return size;
+ }
+
+ public void clear() {
+ size = 0;
+ Arrays.fill(queue, null);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ @Nonnull
+ public <O> O[] toArray(O[] out) {
+ if (out.length < size) {
+ return (O[]) Arrays.copyOfRange(queue, QUEUE_HEAD_INDEX, QUEUE_HEAD_INDEX + size, out.getClass());
+ } else {
+ System.arraycopy(queue, QUEUE_HEAD_INDEX, out, 0, size);
+ if (out.length > size) {
+ out[size] = null;
+ }
+ return out;
+ }
+ }
+
+ /**
+ * Returns an iterator over the elements in this queue. The iterator
+ * does not return the elements in any particular order.
+ *
+ * @return an iterator over the elements in this queue.
+ */
+ @Nonnull
+ public CloseableIterator<T> iterator() {
+ return new HeapIterator();
+ }
+
+ @Override
+ public void addAll(@Nullable Collection<? extends T> restoredElements) {
+
+ if (restoredElements == null) {
+ return;
+ }
+
+ resizeForBulkLoad(restoredElements.size());
+
+ for (T element : restoredElements) {
+ add(element);
+ }
+ }
+
+ private boolean addInternal(@Nonnull T element) {
+ final int newSize = increaseSizeByOne();
+ moveElementToIdx(element, newSize);
+ siftUp(newSize);
+ return element.getInternalIndex() == QUEUE_HEAD_INDEX;
+ }
+
+ private boolean removeInternal(@Nonnull T elementToRemove) {
+ final int elementIndex = elementToRemove.getInternalIndex();
+ removeElementAtIndex(elementIndex);
+ return elementIndex == QUEUE_HEAD_INDEX;
+ }
+
+ private T removeElementAtIndex(int removeIdx) {
+ T[] heap = this.queue;
+ T removedValue = heap[removeIdx];
+
+ assert removedValue.getInternalIndex() == removeIdx;
+
+ final int oldSize = size;
+
+ if (removeIdx != oldSize) {
+ T element = heap[oldSize];
+ moveElementToIdx(element, removeIdx);
+ adjustElementAtIndex(element, removeIdx);
+ }
+
+ heap[oldSize] = null;
+
+ --size;
+ return removedValue;
+ }
+
+ public void adjustModifiedElement(@Nonnull T element) {
+ final int elementIndex = element.getInternalIndex();
+ if (element == queue[elementIndex]) {
+ adjustElementAtIndex(element, elementIndex);
+ }
+ }
+
+ private void adjustElementAtIndex(T element, int index) {
+ siftDown(index);
+ if (queue[index] == element) {
+ siftUp(index);
+ }
+ }
+
+ private void siftUp(int idx) {
+ final T[] heap = this.queue;
+ final T currentElement = heap[idx];
+ int parentIdx = idx >>> 1;
+
+ while (parentIdx > 0 && isElementLessThen(currentElement, heap[parentIdx])) {
+ moveElementToIdx(heap[parentIdx], idx);
+ idx = parentIdx;
+ parentIdx >>>= 1;
+ }
+
+ moveElementToIdx(currentElement, idx);
+ }
+
+ private void siftDown(int idx) {
+ final T[] heap = this.queue;
+ final int heapSize = this.size;
+
+ final T currentElement = heap[idx];
+ int firstChildIdx = idx << 1;
+ int secondChildIdx = firstChildIdx + 1;
+
+ if (isElementIndexValid(secondChildIdx, heapSize) &&
+ isElementLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
+ firstChildIdx = secondChildIdx;
+ }
+
+ while (isElementIndexValid(firstChildIdx, heapSize) &&
+ isElementLessThen(heap[firstChildIdx], currentElement)) {
+ moveElementToIdx(heap[firstChildIdx], idx);
+ idx = firstChildIdx;
+ firstChildIdx = idx << 1;
+ secondChildIdx = firstChildIdx + 1;
+
+ if (isElementIndexValid(secondChildIdx, heapSize) &&
+ isElementLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
+ firstChildIdx = secondChildIdx;
+ }
+ }
+
+ moveElementToIdx(currentElement, idx);
+ }
+
+ private boolean isElementIndexValid(int elementIndex, int heapSize) {
+ return elementIndex <= heapSize;
+ }
+
+ private boolean isElementLessThen(T a, T b) {
+ return elementComparator.compare(a, b) < 0;
+ }
+
+ private void moveElementToIdx(T element, int idx) {
+ queue[idx] = element;
+ element.setInternalIndex(idx);
+ }
+
+ private int increaseSizeByOne() {
+ final int oldArraySize = queue.length;
+ final int minRequiredNewSize = ++size;
+ if (minRequiredNewSize >= oldArraySize) {
+ final int grow = (oldArraySize < 64) ? oldArraySize + 2 : oldArraySize >> 1;
+ resizeQueueArray(oldArraySize + grow, minRequiredNewSize);
+ }
+ // TODO implement shrinking as well?
+ return minRequiredNewSize;
+ }
+
+ private void resizeForBulkLoad(int totalSize) {
+ if (totalSize > queue.length) {
+ int desiredSize = totalSize + (totalSize >>> 3);
+ resizeQueueArray(desiredSize, totalSize);
+ }
+ }
+
+ private void resizeQueueArray(int desiredSize, int minRequiredSize) {
+ if (isValidArraySize(desiredSize)) {
+ queue = Arrays.copyOf(queue, desiredSize);
+ } else if (isValidArraySize(minRequiredSize)) {
+ queue = Arrays.copyOf(queue, MAX_ARRAY_SIZE);
+ } else {
+ throw new OutOfMemoryError("Required minimum heap size " + minRequiredSize +
+ " exceeds maximum size of " + MAX_ARRAY_SIZE + ".");
+ }
+ }
+
+ private static boolean isValidArraySize(int size) {
+ return size >= 0 && size <= MAX_ARRAY_SIZE;
+ }
+
+ /**
+ * {@link Iterator} implementation for {@link HeapPriorityQueue}.
+ * {@link Iterator#remove()} is not supported.
+ */
+ private class HeapIterator implements CloseableIterator<T> {
+
+ private int iterationIdx;
+
+ HeapIterator() {
+ this.iterationIdx = QUEUE_HEAD_INDEX - 1;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterationIdx < size;
+ }
+
+ @Override
+ public T next() {
+ if (iterationIdx >= size) {
+ throw new NoSuchElementException("Iterator has no next element.");
+ }
+ return queue[++iterationIdx];
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java
new file mode 100644
index 0000000..a84758b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for objects that can be managed by a {@link HeapPriorityQueue}. Such an object can only be contained in at
+ * most one {@link HeapPriorityQueue} at a time.
+ */
+@Internal
+public interface HeapPriorityQueueElement {
+
+ /**
+ * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any
+ * {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when
+ * elements are removed from a {@link HeapPriorityQueue}.
+ */
+ int NOT_CONTAINED = Integer.MIN_VALUE;
+
+ /**
+ * Returns the current index of this object in the internal array of {@link HeapPriorityQueue}.
+ */
+ int getInternalIndex();
+
+ /**
+ * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning
+ * {@link HeapPriorityQueue}.
+ *
+ * @param newIndex the new index in the timer heap.
+ */
+ void setInternalIndex(int newIndex);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
new file mode 100644
index 0000000..61313e9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue with set semantics, based on {@link HeapPriorityQueue}. The heap is supported by hash
+ * set for fast contains (de-duplication) and deletes. Object identification happens based on {@link #equals(Object)}.
+ *
+ * <p>Possible future improvements:
+ * <ul>
+ * <li>We could also implement shrinking for the heap and the deduplication set.</li>
+ * <li>We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful adding, etc..</li>
+ * </ul>
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends HeapPriorityQueue<T> {
+
+ /**
+ * Function to extract the key from contained elements.
+ */
+ private final KeyExtractorFunction<T> keyExtractor;
+
+ /**
+ * This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of elements.
+ */
+ private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
+
+ /**
+ * The key-group range of elements that are managed by this queue.
+ */
+ private final KeyGroupRange keyGroupRange;
+
+ /**
+ * The total number of key-groups of the job.
+ */
+ private final int totalNumberOfKeyGroups;
+
+ /**
+ * Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
+ *
+ * @param elementComparator comparator for the contained elements.
+ * @param keyExtractor function to extract a key from the contained elements.
+ * @param minimumCapacity the minimum and initial capacity of this priority queue.
+ * @param keyGroupRange the key-group range of the elements in this set.
+ * @param totalNumberOfKeyGroups the total number of key-groups of the job.
+ */
+ @SuppressWarnings("unchecked")
+ public HeapPriorityQueueSet(
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnull KeyExtractorFunction<T> keyExtractor,
+ @Nonnegative int minimumCapacity,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int totalNumberOfKeyGroups) {
+
+ super(elementComparator, minimumCapacity);
+
+ this.keyExtractor = keyExtractor;
+
+ this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+ this.keyGroupRange = keyGroupRange;
+
+ final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups();
+ final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange;
+ this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange];
+ for (int i = 0; i < keyGroupsInLocalRange; ++i) {
+ deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize);
+ }
+ }
+
+ @Override
+ @Nullable
+ public T poll() {
+ final T toRemove = super.poll();
+ if (toRemove != null) {
+ return getDedupMapForElement(toRemove).remove(toRemove);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Adds the element to the queue. In contrast to the superclass and to maintain set semantics, this happens only if
+ * no such element is already contained (determined by {@link #equals(Object)}).
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean add(@Nonnull T element) {
+ return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
+ }
+
+ /**
+ * In contrast to the superclass and to maintain set semantics, removal here is based on comparing the given element
+ * via {@link #equals(Object)}.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean remove(@Nonnull T toRemove) {
+ T storedElement = getDedupMapForElement(toRemove).remove(toRemove);
+ return storedElement != null && super.remove(storedElement);
+ }
+
+ @Override
+ public void clear() {
+ super.clear();
+ for (HashMap<?, ?> elementHashMap : deduplicationMapsByKeyGroup) {
+ elementHashMap.clear();
+ }
+ }
+
+ /**
+ * Returns an unmodifiable set of all elements in the given key-group.
+ */
+ @Nonnull
+ public Set<T> getElementsForKeyGroup(@Nonnegative int keyGroupIdx) {
+ return Collections.unmodifiableSet(getDedupMapForKeyGroup(keyGroupIdx).keySet());
+ }
+
+ @VisibleForTesting
+ @SuppressWarnings("unchecked")
+ @Nonnull
+ public List<Set<T>> getElementsByKeyGroup() {
+ List<Set<T>> result = new ArrayList<>(deduplicationMapsByKeyGroup.length);
+ for (int i = 0; i < deduplicationMapsByKeyGroup.length; ++i) {
+ result.add(i, Collections.unmodifiableSet(deduplicationMapsByKeyGroup[i].keySet()));
+ }
+ return result;
+ }
+
+ private HashMap<T, T> getDedupMapForKeyGroup(
+ @Nonnegative int keyGroupIdx) {
+ return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupIdx)];
+ }
+
+ private HashMap<T, T> getDedupMapForElement(T element) {
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(
+ keyExtractor.extractKeyFromElement(element),
+ totalNumberOfKeyGroups);
+ return getDedupMapForKeyGroup(keyGroup);
+ }
+
+ private int globalKeyGroupToLocalIndex(int keyGroup) {
+ checkArgument(keyGroupRange.contains(keyGroup));
+ return keyGroup - keyGroupRange.getStartKeyGroup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
new file mode 100644
index 0000000..af4d54f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+/**
+ * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
+ * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
+ * semantics.
+ *
+ * @param <T> the type of elements in the queue.
+ * @param <PQ> type type of sub-queue used for each key-group partition.
+ */
+public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+ implements InternalPriorityQueue<T> {
+
+ /** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
+ @Nonnull
+ private final HeapPriorityQueue<PQ> heapOfkeyGroupedHeaps;
+
+ /** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
+ @Nonnull
+ private final PQ[] keyGroupedHeaps;
+
+ /** Function to extract the key from contained elements. */
+ @Nonnull
+ private final KeyExtractorFunction<T> keyExtractor;
+
+ /** The total number of key-groups (in the job). */
+ @Nonnegative
+ private final int totalKeyGroups;
+
+ /** The smallest key-group id with a subpartition managed by this ordered set. */
+ @Nonnegative
+ private final int firstKeyGroup;
+
+ @SuppressWarnings("unchecked")
+ public KeyGroupPartitionedPriorityQueue(
+ @Nonnull KeyExtractorFunction<T> keyExtractor,
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int totalKeyGroups) {
+
+ this.keyExtractor = keyExtractor;
+ this.totalKeyGroups = totalKeyGroups;
+ this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
+ this.keyGroupedHeaps = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
+ this.heapOfkeyGroupedHeaps = new HeapPriorityQueue<>(
+ new InternalPriorityQueueComparator<>(elementComparator),
+ keyGroupRange.getNumberOfKeyGroups());
+ for (int i = 0; i < keyGroupedHeaps.length; i++) {
+ final PQ keyGroupSubHeap =
+ orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementComparator);
+ keyGroupedHeaps[i] = keyGroupSubHeap;
+ heapOfkeyGroupedHeaps.add(keyGroupSubHeap);
+ }
+ }
+
+ @Nullable
+ @Override
+ public T poll() {
+ final PQ headList = heapOfkeyGroupedHeaps.peek();
+ final T head = headList.poll();
+ heapOfkeyGroupedHeaps.adjustModifiedElement(headList);
+ return head;
+ }
+
+ @Nullable
+ @Override
+ public T peek() {
+ return heapOfkeyGroupedHeaps.peek().peek();
+ }
+
+ @Override
+ public boolean add(@Nonnull T toAdd) {
+ final PQ list = getKeyGroupSubHeapForElement(toAdd);
+
+ // the branch checks if the head element has (potentially) changed.
+ if (list.add(toAdd)) {
+ heapOfkeyGroupedHeaps.adjustModifiedElement(list);
+ // could we have a new head?
+ return toAdd.equals(peek());
+ } else {
+ // head unchanged
+ return false;
+ }
+ }
+
+ @Override
+ public boolean remove(@Nonnull T toRemove) {
+ final PQ list = getKeyGroupSubHeapForElement(toRemove);
+
+ final T oldHead = peek();
+
+ // the branch checks if the head element has (potentially) changed.
+ if (list.remove(toRemove)) {
+ heapOfkeyGroupedHeaps.adjustModifiedElement(list);
+ // could we have a new head?
+ return toRemove.equals(oldHead);
+ } else {
+ // head unchanged
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return peek() == null;
+ }
+
+ @Override
+ public int size() {
+ int sizeSum = 0;
+ for (PQ list : keyGroupedHeaps) {
+ sizeSum += list.size();
+ }
+ return sizeSum;
+ }
+
+ @Override
+ public void addAll(@Nullable Collection<? extends T> toAdd) {
+
+ if (toAdd == null) {
+ return;
+ }
+
+ // TODO consider bulk loading the partitions and "heapify" keyGroupHeap once after all elements are inserted.
+ for (T element : toAdd) {
+ add(element);
+ }
+ }
+
+ @Nonnull
+ @Override
+ public CloseableIterator<T> iterator() {
+ return new KeyGroupConcatenationIterator<>(keyGroupedHeaps);
+ }
+
+ private PQ getKeyGroupSubHeapForElement(T element) {
+ return keyGroupedHeaps[computeKeyGroupIndex(element)];
+ }
+
+ private int computeKeyGroupIndex(T element) {
+ final Object extractKeyFromElement = keyExtractor.extractKeyFromElement(element);
+ final int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement, totalKeyGroups);
+ return keyGroupId - firstKeyGroup;
+ }
+
+ /**
+ * Iterator for {@link KeyGroupPartitionedPriorityQueue}. This iterator is not guaranteeing any order of elements.
+ * Using code must {@link #close()} after usage.
+ *
+ * @param <T> the type of iterated elements.
+ */
+ private static final class KeyGroupConcatenationIterator<
+ T, PQS extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+ implements CloseableIterator<T> {
+
+ /** Array with the subpartitions that we iterate. No null values in the array. */
+ @Nonnull
+ private final PQS[] keyGroupLists;
+
+ /** The subpartition the is currently iterated. */
+ @Nonnegative
+ private int index;
+
+ /** The iterator of the current subpartition. */
+ @Nonnull
+ private CloseableIterator<T> current;
+
+ private KeyGroupConcatenationIterator(@Nonnull PQS[] keyGroupLists) {
+ this.keyGroupLists = keyGroupLists;
+ this.index = 0;
+ this.current = CloseableIterator.empty();
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean currentHasNext = current.hasNext();
+
+ // find the iterator of the next partition that has elements.
+ while (!currentHasNext && index < keyGroupLists.length) {
+ IOUtils.closeQuietly(current);
+ current = keyGroupLists[index++].iterator();
+ currentHasNext = current.hasNext();
+ }
+ return currentHasNext;
+ }
+
+ @Override
+ public T next() {
+ return current.next();
+ }
+
+ @Override
+ public void close() throws Exception {
+ current.close();
+ }
+ }
+
+ /**
+ * Comparator that compares {@link InternalPriorityQueue} objects by their head element. Must handle null results
+ * from {@link #peek()}.
+ *
+ * @param <T> type of the elements in the compared queues.
+ * @param <Q> type of queue.
+ */
+ private static final class InternalPriorityQueueComparator<T, Q extends InternalPriorityQueue<T>>
+ implements Comparator<Q> {
+
+ /** Comparator for the queue elements, so we can compare their heads. */
+ @Nonnull
+ private final Comparator<T> elementComparator;
+
+ InternalPriorityQueueComparator(@Nonnull Comparator<T> elementComparator) {
+ this.elementComparator = elementComparator;
+ }
+
+ @Override
+ public int compare(Q o1, Q o2) {
+ final T left = o1.peek();
+ final T right = o2.peek();
+ if (left == null) {
+ return (right == null ? 0 : 1);
+ } else {
+ return (right == null ? -1 : elementComparator.compare(left, right));
+ }
+ }
+ }
+
+ /**
+ * Factory that produces the sub-queues that represent the partitions of a {@link KeyGroupPartitionedPriorityQueue}.
+ *
+ * @param <T> type of the elements in the queue set.
+ * @param <PQS> type of the priority queue. Must have set semantics and {@link HeapPriorityQueueElement}.
+ */
+ public interface PartitionQueueSetFactory<T, PQS extends InternalPriorityQueue<T> & HeapPriorityQueueElement> {
+
+ /**
+ * Creates a new queue for a given key-group partition.
+ *
+ * @param keyGroupId the key-group of the elements managed by the produced queue.
+ * @param numKeyGroups the total number of key-groups in the job.
+ * @param elementComparator the comparator that determines the order of the managed elements.
+ * @return a new queue for the given key-group.
+ */
+ @Nonnull
+ PQS create(@Nonnegative int keyGroupId, @Nonnegative int numKeyGroups, @Nonnull Comparator<T> elementComparator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
new file mode 100644
index 0000000..0e7d9dd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.util.Preconditions;
+
+import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Comparator;
+
+/**
+ * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetCache} based
+ * on an AVL-Tree. We chose the implementation from fastutil over JDK for performance reasons.
+ *
+ * <p>Maintainer notes: We can consider the following potential performance improvements. First, we could introduce a
+ * bulk-load method to OrderedSetCache to exploit the fact that adding from an OrderedSetStore is already happening in
+ * sorted order, e.g. there are more efficient ways to construct search trees from sorted elements. Second, we could
+ * replace the internal AVL-Tree with an extended variant of {@link HeapPriorityQueueSet} that is organized as a
+ * Min-Max-Heap.
+ *
+ * @param <E> type of the contained elements.
+ */
+public class TreeOrderedSetCache<E> implements CachingInternalPriorityQueueSet.OrderedSetCache<E> {
+
+ /** The tree is used to store cached elements. */
+ @Nonnull
+ private final ObjectAVLTreeSet<E> avlTree;
+
+ /** The element comparator. */
+ @Nonnull
+ private final Comparator<E> elementComparator;
+
+ /** The maximum capacity of the cache. */
+ @Nonnegative
+ private final int capacity;
+
+ /**
+ * Creates a new {@link TreeOrderedSetCache} with the given capacity and element comparator. Capacity must be > 0.
+ * @param elementComparator comparator for the cached elements.
+ * @param capacity the capacity of the cache. Must be > 0.
+ */
+ public TreeOrderedSetCache(@Nonnull Comparator<E> elementComparator, @Nonnegative int capacity) {
+ Preconditions.checkArgument(capacity > 0, "Cache capacity must be greater than 0.");
+ this.avlTree = new ObjectAVLTreeSet<>(elementComparator);
+ this.elementComparator = elementComparator;
+ this.capacity = capacity;
+ }
+
+ @Override
+ public void add(@Nonnull E element) {
+ assert !isFull();
+ avlTree.add(element);
+ }
+
+ @Override
+ public void remove(@Nonnull E element) {
+ avlTree.remove(element);
+ }
+
+ @Override
+ public boolean isFull() {
+ return avlTree.size() == capacity;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return avlTree.isEmpty();
+ }
+
+ @Override
+ public boolean isInLowerBound(@Nonnull E toCheck) {
+ return avlTree.isEmpty() || elementComparator.compare(peekLast(), toCheck) > 0;
+ }
+
+ @Nullable
+ @Override
+ public E removeFirst() {
+ if (avlTree.isEmpty()) {
+ return null;
+ }
+ final E first = avlTree.first();
+ avlTree.remove(first);
+ return first;
+ }
+
+ @Nullable
+ @Override
+ public E removeLast() {
+ if (avlTree.isEmpty()) {
+ return null;
+ }
+ final E last = avlTree.last();
+ avlTree.remove(last);
+ return last;
+ }
+
+ @Nullable
+ @Override
+ public E peekFirst() {
+ return !avlTree.isEmpty() ? avlTree.first() : null;
+ }
+
+ @Nullable
+ @Override
+ public E peekLast() {
+ return !avlTree.isEmpty() ? avlTree.last() : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
new file mode 100644
index 0000000..c0c3ba4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomElements(
+ @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+ @Nonnull Set<TestElement> checkSet,
+ int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ final boolean checkEndSizes = priorityQueue.isEmpty();
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority = localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated = priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ Assert.assertTrue(headChangedIndicated);
+ }
+ }
+
+ if (checkEndSizes) {
+ Assert.assertEquals(count, priorityQueue.size());
+ }
+ }
+
+ @Test
+ public void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomElements(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ int lastSize = priorityQueue.size();
+ Assert.assertEquals(testSize, lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ Assert.assertFalse(priorityQueue.isEmpty());
+ Assert.assertEquals(lastSize, priorityQueue.size());
+ Assert.assertEquals(testElement, priorityQueue.poll());
+ Assert.assertTrue(checkSet.remove(testElement));
+ Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+ Assert.assertEquals(0, priorityQueue.size());
+ Assert.assertEquals(0, checkSet.size());
+ }
+
+ @Test
+ public void testRemoveInsertMixKeepsOrder() {
+
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ final ThreadLocalRandom random = ThreadLocalRandom.current();
+ final int testSize = 300;
+ final int addCounterMax = testSize / 4;
+ int iterationsTillNextAdds = random.nextInt(addCounterMax);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomElements(priorityQueue, checkSet, testSize);
+
+ // check that the whole set is still in order
+ while (!checkSet.isEmpty()) {
+
+ Iterator<TestElement> iterator = checkSet.iterator();
+ TestElement element = iterator.next();
+ iterator.remove();
+
+ final boolean removesHead = element.equals(priorityQueue.peek());
+
+ if (removesHead) {
+ Assert.assertTrue(priorityQueue.remove(element));
+ } else {
+ priorityQueue.remove(element);
+ }
+
+ long lastPriorityValue = removesHead ? element.getPriority() : Long.MIN_VALUE;
+
+ while ((element = priorityQueue.poll()) != null) {
+ Assert.assertTrue(element.getPriority() >= lastPriorityValue);
+ lastPriorityValue = element.getPriority();
+ if (--iterationsTillNextAdds == 0) {
+ // some random adds
+ iterationsTillNextAdds = random.nextInt(addCounterMax);
+ insertRandomElements(priorityQueue, new HashSet<>(checkSet), 1 + random.nextInt(3));
+ lastPriorityValue = priorityQueue.peek().getPriority();
+ }
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ priorityQueue.addAll(checkSet);
+ }
+ }
+
+ @Test
+ public void testPoll() {
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ Assert.assertNull(priorityQueue.poll());
+
+ final int testSize = 345;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+ insertRandomElements(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ while (!priorityQueue.isEmpty()) {
+ TestElement removed = priorityQueue.poll();
+ Assert.assertNotNull(removed);
+ Assert.assertTrue(checkSet.remove(removed));
+ Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
+ lastPriorityValue = removed.getPriority();
+ }
+ Assert.assertTrue(checkSet.isEmpty());
+
+ Assert.assertNull(priorityQueue.poll());
+ }
+
+ @Test
+ public void testIsEmpty() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
+ Assert.assertFalse(priorityQueue.isEmpty());
+
+ priorityQueue.poll();
+ Assert.assertTrue(priorityQueue.isEmpty());
+ }
+
+ @Test
+ public void testBulkAddRestoredElements() throws Exception {
+ final int testSize = 10;
+ HashSet<TestElement> elementSet = new HashSet<>(testSize);
+ for (int i = 0; i < testSize; ++i) {
+ elementSet.add(new TestElement(i, i));
+ }
+
+ List<TestElement> twoTimesElementSet = new ArrayList<>(elementSet.size() * 2);
+
+ for (TestElement testElement : elementSet) {
+ twoTimesElementSet.add(testElement.deepCopy());
+ twoTimesElementSet.add(testElement.deepCopy());
+ }
+
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ priorityQueue.addAll(twoTimesElementSet);
+ priorityQueue.addAll(elementSet);
+
+ final int expectedSize = testSetSemanticsAgainstDuplicateElements() ? elementSet.size() : 3 * elementSet.size();
+
+ Assert.assertEquals(expectedSize, priorityQueue.size());
+ try (final CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ while (iterator.hasNext()) {
+ if (testSetSemanticsAgainstDuplicateElements()) {
+ Assert.assertTrue(elementSet.remove(iterator.next()));
+ } else {
+ Assert.assertTrue(elementSet.contains(iterator.next()));
+ }
+ }
+ }
+ if (testSetSemanticsAgainstDuplicateElements()) {
+ Assert.assertTrue(elementSet.isEmpty());
+ }
+ }
+
+ @Test
+ public void testIterator() throws Exception {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ // test empty iterator
+ try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ Assert.assertFalse(iterator.hasNext());
+ try {
+ iterator.next();
+ Assert.fail();
+ } catch (NoSuchElementException ignore) {
+ }
+ }
+
+ // iterate some data
+ final int testSize = 10;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+ insertRandomElements(priorityQueue, checkSet, testSize);
+ try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ while (iterator.hasNext()) {
+ Assert.assertTrue(checkSet.remove(iterator.next()));
+ }
+ Assert.assertTrue(checkSet.isEmpty());
+ }
+ }
+
+ @Test
+ public void testAdd() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ TestElement lowPrioElement = new TestElement(4711L, 42L);
+ TestElement highPrioElement = new TestElement(815L, 23L);
+ Assert.assertTrue(priorityQueue.add(lowPrioElement));
+ if (testSetSemanticsAgainstDuplicateElements()) {
+ priorityQueue.add(lowPrioElement.deepCopy());
+ }
+ Assert.assertEquals(1, priorityQueue.size());
+ Assert.assertTrue(priorityQueue.add(highPrioElement));
+ Assert.assertEquals(2, priorityQueue.size());
+ Assert.assertEquals(highPrioElement, priorityQueue.poll());
+ Assert.assertEquals(1, priorityQueue.size());
+ Assert.assertEquals(lowPrioElement, priorityQueue.poll());
+ Assert.assertEquals(0, priorityQueue.size());
+ }
+
+ @Test
+ public void testRemove() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ final long key = 4711L;
+ final long priorityValue = 42L;
+ final TestElement testElement = new TestElement(key, priorityValue);
+ if (testSetSemanticsAgainstDuplicateElements()) {
+ Assert.assertFalse(priorityQueue.remove(testElement));
+ }
+ Assert.assertTrue(priorityQueue.add(testElement));
+ Assert.assertTrue(priorityQueue.remove(testElement));
+ if (testSetSemanticsAgainstDuplicateElements()) {
+ Assert.assertFalse(priorityQueue.remove(testElement));
+ }
+ Assert.assertTrue(priorityQueue.isEmpty());
+ }
+
+ protected abstract InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity);
+
+ protected abstract boolean testSetSemanticsAgainstDuplicateElements();
+
+ /**
+ * Payload for usage in the test.
+ */
+ protected static class TestElement implements HeapPriorityQueueElement {
+
+ private final long key;
+ private final long priority;
+ private int internalIndex;
+
+ public TestElement(long key, long priority) {
+ this.key = key;
+ this.priority = priority;
+ this.internalIndex = NOT_CONTAINED;
+ }
+
+ public long getKey() {
+ return key;
+ }
+
+ public long getPriority() {
+ return priority;
+ }
+
+ @Override
+ public int getInternalIndex() {
+ return internalIndex;
+ }
+
+ @Override
+ public void setInternalIndex(int newIndex) {
+ internalIndex = newIndex;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestElement that = (TestElement) o;
+ return getKey() == that.getKey() &&
+ getPriority() == that.getPriority();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getKey(), getPriority());
+ }
+
+ public TestElement deepCopy() {
+ return new TestElement(key, priority);
+ }
+ }
+
+ /**
+ * Serializer for {@link TestElement}. The serialization format produced by this serializer allows lexicographic
+ * ordering by {@link TestElement#getPriority}.
+ */
+ protected static class TestElementSerializer extends TypeSerializer<TestElement> {
+
+ public static final TestElementSerializer INSTANCE = new TestElementSerializer();
+
+ private TestElementSerializer() {
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public TypeSerializer<TestElement> duplicate() {
+ return this;
+ }
+
+ @Override
+ public TestElement createInstance() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TestElement copy(TestElement from) {
+ return new TestElement(from.key, from.priority);
+ }
+
+ @Override
+ public TestElement copy(TestElement from, TestElement reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return 2 * Long.BYTES;
+ }
+
+ @Override
+ public void serialize(TestElement record, DataOutputView target) throws IOException {
+ // serialize priority first, so that we have correct order in RocksDB. We flip the sign bit for correct
+ // lexicographic order.
+ target.writeLong(MathUtils.flipSignBit(record.getPriority()));
+ target.writeLong(record.getKey());
+ }
+
+ @Override
+ public TestElement deserialize(DataInputView source) throws IOException {
+ long prio = MathUtils.flipSignBit(source.readLong());
+ long key = source.readLong();
+ return new TestElement(key, prio);
+ }
+
+ @Override
+ public TestElement deserialize(TestElement reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ serialize(deserialize(source), target);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return false;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return 4711;
+ }
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompatibilityResult<TestElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
index 9298743..e6b7739 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupPartitionerTestBase.java
@@ -46,14 +46,14 @@ public abstract class KeyGroupPartitionerTestBase<T> extends TestLogger {
new DataOutputViewStreamWrapper(new ByteArrayOutputStreamWithPos(0));
@Nonnull
- protected final KeyGroupPartitioner.KeyExtractorFunction<T> keyExtractorFunction;
+ protected final KeyExtractorFunction<T> keyExtractorFunction;
@Nonnull
protected final Function<Random, T> elementGenerator;
protected KeyGroupPartitionerTestBase(
@Nonnull Function<Random, T> elementGenerator,
- @Nonnull KeyGroupPartitioner.KeyExtractorFunction<T> keyExtractorFunction) {
+ @Nonnull KeyExtractorFunction<T> keyExtractorFunction) {
this.elementGenerator = elementGenerator;
this.keyExtractorFunction = keyExtractorFunction;
@@ -138,7 +138,7 @@ public abstract class KeyGroupPartitionerTestBase<T> extends TestLogger {
static final class ValidatingElementWriterDummy<T> implements KeyGroupPartitioner.ElementWriterFunction<T> {
@Nonnull
- private final KeyGroupPartitioner.KeyExtractorFunction<T> keyExtractorFunction;
+ private final KeyExtractorFunction<T> keyExtractorFunction;
@Nonnegative
private final int numberOfKeyGroups;
@Nonnull
@@ -147,7 +147,7 @@ public abstract class KeyGroupPartitionerTestBase<T> extends TestLogger {
private int currentKeyGroup;
ValidatingElementWriterDummy(
- @Nonnull KeyGroupPartitioner.KeyExtractorFunction<T> keyExtractorFunction,
+ @Nonnull KeyExtractorFunction<T> keyExtractorFunction,
@Nonnegative int numberOfKeyGroups,
@Nonnull Set<T> allElementsSet) {
this.keyExtractorFunction = keyExtractorFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSetTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSetTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSetTestBase.java
new file mode 100644
index 0000000..79fd556
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSetTestBase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
+
+/**
+ * Test for {@link CachingInternalPriorityQueueSet}.
+ */
+public abstract class CachingInternalPriorityQueueSetTestBase extends InternalPriorityQueueTestBase {
+
+ @Override
+ protected InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
+ final CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> cache = createOrderedSetCache();
+ final CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> store = createOrderedSetStore();
+ return new CachingInternalPriorityQueueSet<>(cache, store);
+ }
+
+ @Override
+ protected boolean testSetSemanticsAgainstDuplicateElements() {
+ return true;
+ }
+
+ protected abstract CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> createOrderedSetStore();
+
+ protected abstract CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> createOrderedSetCache();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
new file mode 100644
index 0000000..618da4e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+/**
+ * Test for {@link HeapPriorityQueueSet}.
+ */
+public class HeapPriorityQueueSetTest extends HeapPriorityQueueTest {
+
+ @Override
+ protected HeapPriorityQueueSet<TestElement> newPriorityQueue(int initialCapacity) {
+ return new HeapPriorityQueueSet<>(
+ TEST_ELEMENT_COMPARATOR,
+ KEY_EXTRACTOR_FUNCTION,
+ initialCapacity,
+ KEY_GROUP_RANGE,
+ KEY_GROUP_RANGE.getNumberOfKeyGroups());
+ }
+
+ @Override
+ protected boolean testSetSemanticsAgainstDuplicateElements() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
new file mode 100644
index 0000000..8ffb8b8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+/**
+ * Test for {@link HeapPriorityQueue}.
+ */
+public class HeapPriorityQueueTest extends InternalPriorityQueueTestBase {
+
+ @Test
+ public void testClear() {
+ HeapPriorityQueue<TestElement> priorityQueueSet =
+ newPriorityQueue(1);
+
+ int count = 10;
+ HashSet<TestElement> checkSet = new HashSet<>(count);
+ insertRandomElements(priorityQueueSet, checkSet, count);
+ Assert.assertEquals(count, priorityQueueSet.size());
+ priorityQueueSet.clear();
+ Assert.assertEquals(0, priorityQueueSet.size());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testToArray() {
+
+ final int testSize = 10;
+
+ List<TestElement[]> tests = new ArrayList<>(2);
+ tests.add(new TestElement[0]);
+ tests.add(new TestElement[testSize]);
+ tests.add(new TestElement[testSize + 1]);
+
+ for (TestElement[] testArray : tests) {
+
+ Arrays.fill(testArray, new TestElement(42L, 4711L));
+
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ HeapPriorityQueue<TestElement> timerPriorityQueue =
+ newPriorityQueue(1);
+
+ Assert.assertEquals(testArray.length, timerPriorityQueue.toArray(testArray).length);
+
+ insertRandomElements(timerPriorityQueue, checkSet, testSize);
+
+ TestElement[] toArray = timerPriorityQueue.toArray(testArray);
+
+ Assert.assertEquals((testArray.length >= testSize), (testArray == toArray));
+
+ int count = 0;
+ for (TestElement o : toArray) {
+ if (o == null) {
+ break;
+ }
+ Assert.assertTrue(checkSet.remove(o));
+ ++count;
+ }
+
+ Assert.assertEquals(timerPriorityQueue.size(), count);
+ Assert.assertTrue(checkSet.isEmpty());
+ }
+ }
+
+ @Override
+ protected HeapPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
+ return new HeapPriorityQueue<>(TEST_ELEMENT_COMPARATOR, initialCapacity);
+ }
+
+ @Override
+ protected boolean testSetSemanticsAgainstDuplicateElements() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
new file mode 100644
index 0000000..277de19
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.InternalPriorityQueueTestBase;
+
+/**
+ * Test for {@link KeyGroupPartitionedPriorityQueue}.
+ */
+public class KeyGroupPartitionedPriorityQueueTest extends InternalPriorityQueueTestBase {
+
+ @Override
+ protected InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
+ return new KeyGroupPartitionedPriorityQueue<>(
+ KEY_EXTRACTOR_FUNCTION,
+ TEST_ELEMENT_COMPARATOR,
+ newFactory(initialCapacity),
+ KEY_GROUP_RANGE, KEY_GROUP_RANGE.getNumberOfKeyGroups());
+ }
+
+ protected KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<
+ TestElement, CachingInternalPriorityQueueSet<TestElement>> newFactory(int initialCapacity) {
+
+ return (keyGroupId, numKeyGroups, elementComparator) -> {
+ CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> cache =
+ new TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 32);
+ CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> store =
+ new TestOrderedStore<>(TEST_ELEMENT_COMPARATOR);
+ return new CachingInternalPriorityQueueSet<>(cache, store);
+ };
+ }
+
+ @Override
+ protected boolean testSetSemanticsAgainstDuplicateElements() {
+ return true;
+ }
+}