You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/09 14:13:17 UTC
[2/2] flink git commit: [FLINK-9486][state] Introduce
InternalPriorityQueue as state in keyed state backends
[FLINK-9486][state] Introduce InternalPriorityQueue as state in keyed state backends
This commit does not include the integration with checkpointing.
This closes #6276.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79b38f8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79b38f8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79b38f8f
Branch: refs/heads/master
Commit: 79b38f8f9a79b917d525842cf46087c5b8c40f3d
Parents: b12acea
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Jul 4 13:43:49 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jul 9 16:12:51 2018 +0200
----------------------------------------------------------------------
.../KVStateRequestSerializerRocksDBTest.java | 16 +-
.../network/KvStateRequestSerializerTest.java | 19 +-
.../runtime/state/InternalPriorityQueue.java | 12 +
.../state/KeyGroupedInternalPriorityQueue.java | 38 ++++
.../flink/runtime/state/KeyedStateBackend.java | 3 +-
.../flink/runtime/state/PriorityComparator.java | 42 ++++
.../runtime/state/PriorityQueueSetFactory.java | 46 ++++
.../state/TieBreakingPriorityComparator.java | 122 ++++++++++
.../state/filesystem/FsStateBackend.java | 6 +-
.../heap/CachingInternalPriorityQueueSet.java | 26 ++-
.../state/heap/HeapKeyedStateBackend.java | 28 ++-
.../runtime/state/heap/HeapPriorityQueue.java | 35 ++-
.../state/heap/HeapPriorityQueueSet.java | 46 ++--
.../state/heap/HeapPriorityQueueSetFactory.java | 69 ++++++
.../heap/KeyGroupPartitionedPriorityQueue.java | 63 ++++--
.../runtime/state/heap/TreeOrderedSetCache.java | 7 +
.../state/memory/MemoryStateBackend.java | 7 +-
.../state/InternalPriorityQueueTestBase.java | 12 +-
.../state/StateSnapshotCompressionTest.java | 12 +-
.../state/heap/HeapPriorityQueueSetTest.java | 2 +-
.../state/heap/HeapPriorityQueueTest.java | 2 +-
.../state/heap/HeapStateBackendTestBase.java | 10 +-
.../KeyGroupPartitionedPriorityQueueTest.java | 2 +-
.../streaming/state/RockDBBackendOptions.java | 38 ++++
.../state/RocksDBKeyedStateBackend.java | 171 +++++++++++++-
.../streaming/state/RocksDBOrderedSetStore.java | 13 +-
.../streaming/state/RocksDBStateBackend.java | 22 +-
...nalPriorityQueueSetWithRocksDBStoreTest.java | 1 -
.../state/RocksDBOrderedSetStoreTest.java | 1 -
.../state/RocksDBStateBackendTest.java | 3 +-
.../api/operators/AbstractStreamOperator.java | 6 +-
.../api/operators/HeapInternalTimerService.java | 85 +++----
.../operators/InternalTimeServiceManager.java | 80 ++++---
.../streaming/api/operators/InternalTimer.java | 22 ++
.../InternalTimerServiceSerializationProxy.java | 65 +++---
.../StreamTaskStateInitializerImpl.java | 1 +
.../api/operators/TimerHeapInternalTimer.java | 23 --
.../api/operators/TimerSerializer.java | 222 +++++++++++++++++++
.../operators/HeapInternalTimerServiceTest.java | 138 ++++++++----
39 files changed, 1234 insertions(+), 282 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index a49fdd2..9ea3198 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -74,9 +75,12 @@ public final class KVStateRequestSerializerRocksDBTest {
columnFamilyOptions,
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
- 1, new KeyGroupRange(0, 0),
- new ExecutionConfig(), false,
- TestLocalRecoveryConfig.disabled()
+ 1,
+ new KeyGroupRange(0, 0),
+ new ExecutionConfig(),
+ false,
+ TestLocalRecoveryConfig.disabled(),
+ RocksDBStateBackend.PriorityQueueStateType.HEAP
);
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);
@@ -112,10 +116,12 @@ public final class KVStateRequestSerializerRocksDBTest {
columnFamilyOptions,
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
- 1, new KeyGroupRange(0, 0),
+ 1,
+ new KeyGroupRange(0, 0),
new ExecutionConfig(),
false,
- TestLocalRecoveryConfig.disabled());
+ TestLocalRecoveryConfig.disabled(),
+ RocksDBStateBackend.PriorityQueueStateType.HEAP);
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 2ba7507..73f8831 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
@@ -185,18 +186,19 @@ public class KvStateRequestSerializerTest {
@Test
public void testListSerialization() throws Exception {
final long key = 0L;
-
+ final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
// objects for heap state list serialisation
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
new HeapKeyedStateBackend<>(
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
- 1,
- new KeyGroupRange(0, 0),
+ keyGroupRange.getNumberOfKeyGroups(),
+ keyGroupRange,
async,
new ExecutionConfig(),
- TestLocalRecoveryConfig.disabled()
+ TestLocalRecoveryConfig.disabled(),
+ new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128)
);
longHeapKeyedStateBackend.setCurrentKey(key);
@@ -292,18 +294,19 @@ public class KvStateRequestSerializerTest {
@Test
public void testMapSerialization() throws Exception {
final long key = 0L;
-
+ final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
// objects for heap state list serialisation
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
new HeapKeyedStateBackend<>(
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
- 1,
- new KeyGroupRange(0, 0),
+ keyGroupRange.getNumberOfKeyGroups(),
+ keyGroupRange,
async,
new ExecutionConfig(),
- TestLocalRecoveryConfig.disabled()
+ TestLocalRecoveryConfig.disabled(),
+ new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128)
);
longHeapKeyedStateBackend.setCurrentKey(key);
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
index fb3ee82..dc46c8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
@@ -26,6 +26,8 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collection;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
/**
* Interface for collection that gives in order access to elements w.r.t their priority.
@@ -36,6 +38,16 @@ import java.util.Collection;
public interface InternalPriorityQueue<T> {
/**
+ * Polls from the top of the queue as long as the the queue is not empty and passes the elements to
+ * {@link Consumer} until a {@link Predicate} rejects an offered element. The rejected element is not
+ * removed from the queue and becomes the new head.
+ *
+ * @param canConsume bulk polling ends once this returns false. The rejected element is nor removed and not consumed.
+ * @param consumer consumer function for elements accepted by canConsume.
+ */
+ void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer);
+
+ /**
* Retrieves and removes the first element (w.r.t. the order) of this set,
* or returns {@code null} if this set is empty.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java
new file mode 100644
index 0000000..68472e2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+import java.util.Set;
+
+/**
+ * This interface exists as (temporary) adapter between the new {@link InternalPriorityQueue} and the old way in which
+ * timers are written in a snapshot. This interface can probably go away once timer state becomes part of the
+ * keyed state backend snapshot.
+ */
+public interface KeyGroupedInternalPriorityQueue<T> extends InternalPriorityQueue<T> {
+
+ /**
+ * Returns the subset of elements in the priority queue that belongs to the given key-group, within the operator's
+ * key-group range.
+ */
+ @Nonnull
+ Set<T> getSubsetForKeyGroup(int keyGroupId);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index ad75a1f..7ba14b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -31,7 +31,8 @@ import java.util.stream.Stream;
*
* @param <K> The key by which state is keyed.
*/
-public interface KeyedStateBackend<K> extends InternalKeyContext<K>, KeyedStateFactory, Disposable {
+public interface KeyedStateBackend<K>
+ extends InternalKeyContext<K>, KeyedStateFactory, PriorityQueueSetFactory, Disposable {
/**
* Sets the current key that is used for partitioned state.
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
new file mode 100644
index 0000000..2f6f5a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/**
+ * This interface works similar to {@link Comparable} and is used to prioritize between two objects. The main difference
+ * between this interface and {@link Comparable} is it is not require to follow the usual contract between that
+ * {@link Comparable#compareTo(Object)} and {@link Object#equals(Object)}. The contract of this interface is:
+ * When two objects are equal, they indicate the same priority, but indicating the same priority does not require that
+ * both objects are equal.
+ *
+ * @param <T> type of the compared objects.
+ */
+@FunctionalInterface
+public interface PriorityComparator<T> {
+
+ /**
+ * Compares two objects for priority. Returns a negative integer, zero, or a positive integer as the first
+ * argument has lower, equal to, or higher priority than the second.
+ * @param left left operand in the comparison by priority.
+ * @param right left operand in the comparison by priority.
+ * @return a negative integer, zero, or a positive integer as the first argument has lower, equal to, or higher
+ * priority than the second.
+ */
+ int comparePriority(T left, T right);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
new file mode 100644
index 0000000..6f509c0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Factory for {@link KeyGroupedInternalPriorityQueue} instances.
+ */
+public interface PriorityQueueSetFactory {
+
+ /**
+ * Creates a {@link KeyGroupedInternalPriorityQueue}.
+ *
+ * @param stateName unique name for associated with this queue.
+ * @param byteOrderedElementSerializer a serializer that with a format that is lexicographically ordered in
+ * alignment with elementPriorityComparator.
+ * @param <T> type of the stored elements.
+ * @return the queue with the specified unique name.
+ */
+ @Nonnull
+ <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ @Nonnull String stateName,
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+ @Nonnull PriorityComparator<T> elementPriorityComparator,
+ @Nonnull KeyExtractorFunction<T> keyExtractor);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
new file mode 100644
index 0000000..4384eb7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+/**
+ * This class is an adapter between {@link PriorityComparator} and a full {@link Comparator} that respects the
+ * contract between {@link Comparator#compare(Object, Object)} and {@link Object#equals(Object)}. This is currently
+ * needed for implementations of
+ * {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetCache} that are implemented
+ * on top of a data structure that relies on the this contract, e.g. a tree set. We should replace this in the near
+ * future.
+ *
+ * @param <T> type of the compared elements.
+ */
+public class TieBreakingPriorityComparator<T> implements Comparator<T>, PriorityComparator<T> {
+
+ /** The {@link PriorityComparator} to which we delegate in a first step. */
+ @Nonnull
+ private final PriorityComparator<T> priorityComparator;
+
+ /** Serializer for instances of the compared objects. */
+ @Nonnull
+ private final TypeSerializer<T> serializer;
+
+ /** Stream that we use in serialization. */
+ @Nonnull
+ private final ByteArrayOutputStreamWithPos outStream;
+
+ /** {@link org.apache.flink.core.memory.DataOutputView} around outStream. */
+ @Nonnull
+ private final DataOutputViewStreamWrapper outView;
+
+ public TieBreakingPriorityComparator(
+ @Nonnull PriorityComparator<T> priorityComparator,
+ @Nonnull TypeSerializer<T> serializer,
+ @Nonnull ByteArrayOutputStreamWithPos outStream,
+ @Nonnull DataOutputViewStreamWrapper outView) {
+
+ this.priorityComparator = priorityComparator;
+ this.serializer = serializer;
+ this.outStream = outStream;
+ this.outView = outView;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compare(T o1, T o2) {
+
+ // first we compare priority, this should be the most commonly hit case
+ int cmp = priorityComparator.comparePriority(o1, o2);
+
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ // here we start tie breaking and do our best to comply with the compareTo/equals contract, first we try
+ // to simply find an existing way to fully compare.
+ if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
+ return ((Comparable<T>) o1).compareTo(o2);
+ }
+
+ // we catch this case before moving to more expensive tie breaks.
+ if (o1.equals(o2)) {
+ return 0;
+ }
+
+ // if objects are not equal, their serialized form should somehow differ as well. this can be costly, and...
+ // TODO we should have an alternative approach in the future, e.g. a cache that does not rely on compare to check equality.
+ try {
+ outStream.reset();
+ serializer.serialize(o1, outView);
+ int leftLen = outStream.getPosition();
+ serializer.serialize(o2, outView);
+ int rightLen = outStream.getPosition() - leftLen;
+ return compareBytes(outStream.getBuf(), 0, leftLen, leftLen, rightLen);
+ } catch (IOException ex) {
+ throw new FlinkRuntimeException("Serializer problem in comparator.", ex);
+ }
+ }
+
+ @Override
+ public int comparePriority(T left, T right) {
+ return priorityComparator.comparePriority(left, right);
+ }
+
+ public static int compareBytes(byte[] bytes, int offLeft, int leftLen, int offRight, int rightLen) {
+ int maxLen = Math.min(leftLen, rightLen);
+ for (int i = 0; i < maxLen; ++i) {
+ int cmp = bytes[offLeft + i] - bytes[offRight + i];
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ return leftLen - rightLen;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 637effd..ad1581b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.util.TernaryBoolean;
import org.slf4j.LoggerFactory;
@@ -457,6 +458,8 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
TaskStateManager taskStateManager = env.getTaskStateManager();
LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
+ HeapPriorityQueueSetFactory priorityQueueSetFactory =
+ new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackend<>(
kvStateRegistry,
@@ -466,7 +469,8 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
keyGroupRange,
isUsingAsynchronousSnapshots(),
env.getExecutionConfig(),
- localRecoveryConfig);
+ localRecoveryConfig,
+ priorityQueueSetFactory);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
index 771315d..6dc8cf3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
@@ -27,6 +27,8 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collection;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
/**
* This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
@@ -76,6 +78,15 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
return orderedCache.peekFirst();
}
+ @Override
+ public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull Consumer<E> consumer) {
+ E element;
+ while ((element = peek()) != null && canConsume.test(element)) {
+ poll();
+ consumer.accept(element);
+ }
+ }
+
@Nullable
@Override
public E poll() {
@@ -158,7 +169,11 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
@Nonnull
@Override
public CloseableIterator<E> iterator() {
- return orderedStore.orderedIterator();
+ if (storeOnlyElements) {
+ return orderedStore.orderedIterator();
+ } else {
+ return orderedCache.orderedIterator();
+ }
}
@Override
@@ -184,7 +199,7 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
}
storeOnlyElements = iterator.hasNext();
} catch (Exception e) {
- throw new FlinkRuntimeException("Exception while closing RocksDB iterator.", e);
+ throw new FlinkRuntimeException("Exception while refilling store from iterator.", e);
}
}
}
@@ -249,6 +264,13 @@ public class CachingInternalPriorityQueueSet<E> implements InternalPriorityQueue
*/
@Nullable
E peekLast();
+
+ /**
+ * Returns an iterator over the store that returns element in order. The iterator must be closed by the client
+ * after usage.
+ */
+ @Nonnull
+ CloseableIterator<E> orderedIterator();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 82ce584..b5b2626 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -44,13 +44,17 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
@@ -102,6 +106,21 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create)
).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+ @Nonnull
+ @Override
+ public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ @Nonnull String stateName,
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+ @Nonnull PriorityComparator<T> elementPriorityComparator,
+ @Nonnull KeyExtractorFunction<T> keyExtractor) {
+
+ return priorityQueueSetFactory.create(
+ stateName,
+ byteOrderedElementSerializer,
+ elementPriorityComparator,
+ keyExtractor);
+ }
+
private interface StateFactory {
<K, N, SV, S extends State, IS extends S> IS createState(
StateDescriptor<S, SV> stateDesc,
@@ -137,6 +156,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*/
private final HeapSnapshotStrategy snapshotStrategy;
+ /**
+ * Factory for state that is organized as priority queue.
+ */
+ private final PriorityQueueSetFactory priorityQueueSetFactory;
+
public HeapKeyedStateBackend(
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
@@ -145,7 +169,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
KeyGroupRange keyGroupRange,
boolean asynchronousSnapshots,
ExecutionConfig executionConfig,
- LocalRecoveryConfig localRecoveryConfig) {
+ LocalRecoveryConfig localRecoveryConfig,
+ PriorityQueueSetFactory priorityQueueSetFactory) {
super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig);
@@ -157,6 +182,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.snapshotStrategy = new HeapSnapshotStrategy(synchronicityTrait);
LOG.info("Initializing heap keyed state backend with stream factory.");
this.restoredKvStateMetaInfos = new HashMap<>();
+ this.priorityQueueSetFactory = priorityQueueSetFactory;
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
index 7017905..e5f610e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.util.CloseableIterator;
import javax.annotation.Nonnegative;
@@ -27,9 +28,10 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
@@ -56,9 +58,9 @@ public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements In
private static final int QUEUE_HEAD_INDEX = 1;
/**
- * Comparator for the contained elements.
+ * Comparator for the priority of contained elements.
*/
- private final Comparator<T> elementComparator;
+ private final PriorityComparator<T> elementPriorityComparator;
/**
* The array that represents the heap-organized priority queue.
@@ -73,19 +75,28 @@ public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements In
/**
* Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
*
- * @param elementComparator comparator for the contained elements.
+ * @param elementPriorityComparator comparator for the priority of contained elements.
* @param minimumCapacity the minimum and initial capacity of this priority queue.
*/
@SuppressWarnings("unchecked")
public HeapPriorityQueue(
- @Nonnull Comparator<T> elementComparator,
+ @Nonnull PriorityComparator<T> elementPriorityComparator,
@Nonnegative int minimumCapacity) {
- this.elementComparator = elementComparator;
+ this.elementPriorityComparator = elementPriorityComparator;
this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
}
@Override
+ public void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
+ T element;
+ while ((element = peek()) != null && canConsume.test(element)) {
+ poll();
+ consumer.accept(element);
+ }
+ }
+
+ @Override
@Nullable
public T poll() {
return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
@@ -227,7 +238,7 @@ public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements In
final T currentElement = heap[idx];
int parentIdx = idx >>> 1;
- while (parentIdx > 0 && isElementLessThen(currentElement, heap[parentIdx])) {
+ while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) {
moveElementToIdx(heap[parentIdx], idx);
idx = parentIdx;
parentIdx >>>= 1;
@@ -245,19 +256,19 @@ public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements In
int secondChildIdx = firstChildIdx + 1;
if (isElementIndexValid(secondChildIdx, heapSize) &&
- isElementLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
+ isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
firstChildIdx = secondChildIdx;
}
while (isElementIndexValid(firstChildIdx, heapSize) &&
- isElementLessThen(heap[firstChildIdx], currentElement)) {
+ isElementPriorityLessThen(heap[firstChildIdx], currentElement)) {
moveElementToIdx(heap[firstChildIdx], idx);
idx = firstChildIdx;
firstChildIdx = idx << 1;
secondChildIdx = firstChildIdx + 1;
if (isElementIndexValid(secondChildIdx, heapSize) &&
- isElementLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
+ isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
firstChildIdx = secondChildIdx;
}
}
@@ -269,8 +280,8 @@ public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements In
return elementIndex <= heapSize;
}
- private boolean isElementLessThen(T a, T b) {
- return elementComparator.compare(a, b) < 0;
+ private boolean isElementPriorityLessThen(T a, T b) {
+ return elementPriorityComparator.comparePriority(a, b) < 0;
}
private void moveElementToIdx(T element, int idx) {
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
index 61313e9..79f319c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
@@ -18,20 +18,17 @@
package org.apache.flink.runtime.state.heap;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.List;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -49,7 +46,9 @@ import static org.apache.flink.util.Preconditions.checkArgument;
*
* @param <T> type of the contained elements.
*/
-public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends HeapPriorityQueue<T> {
+public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement>
+ extends HeapPriorityQueue<T>
+ implements KeyGroupedInternalPriorityQueue<T> {
/**
* Function to extract the key from contained elements.
@@ -74,7 +73,7 @@ public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends He
/**
* Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
*
- * @param elementComparator comparator for the contained elements.
+ * @param elementPriorityComparator comparator for the priority of contained elements.
* @param keyExtractor function to extract a key from the contained elements.
* @param minimumCapacity the minimum and initial capacity of this priority queue.
* @param keyGroupRange the key-group range of the elements in this set.
@@ -82,13 +81,13 @@ public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends He
*/
@SuppressWarnings("unchecked")
public HeapPriorityQueueSet(
- @Nonnull Comparator<T> elementComparator,
+ @Nonnull PriorityComparator<T> elementPriorityComparator,
@Nonnull KeyExtractorFunction<T> keyExtractor,
@Nonnegative int minimumCapacity,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalNumberOfKeyGroups) {
- super(elementComparator, minimumCapacity);
+ super(elementPriorityComparator, minimumCapacity);
this.keyExtractor = keyExtractor;
@@ -147,28 +146,9 @@ public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends He
}
}
- /**
- * Returns an unmodifiable set of all elements in the given key-group.
- */
- @Nonnull
- public Set<T> getElementsForKeyGroup(@Nonnegative int keyGroupIdx) {
- return Collections.unmodifiableSet(getDedupMapForKeyGroup(keyGroupIdx).keySet());
- }
-
- @VisibleForTesting
- @SuppressWarnings("unchecked")
- @Nonnull
- public List<Set<T>> getElementsByKeyGroup() {
- List<Set<T>> result = new ArrayList<>(deduplicationMapsByKeyGroup.length);
- for (int i = 0; i < deduplicationMapsByKeyGroup.length; ++i) {
- result.add(i, Collections.unmodifiableSet(deduplicationMapsByKeyGroup[i].keySet()));
- }
- return result;
- }
-
private HashMap<T, T> getDedupMapForKeyGroup(
- @Nonnegative int keyGroupIdx) {
- return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupIdx)];
+ @Nonnegative int keyGroupId) {
+ return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupId)];
}
private HashMap<T, T> getDedupMapForElement(T element) {
@@ -182,4 +162,10 @@ public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends He
checkArgument(keyGroupRange.contains(keyGroup));
return keyGroup - keyGroupRange.getStartKeyGroup();
}
+
+ @Nonnull
+ @Override
+ public Set<T> getSubsetForKeyGroup(int keyGroupId) {
+ return getDedupMapForKeyGroup(keyGroupId).keySet();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
new file mode 100644
index 0000000..ee6fda9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.PriorityQueueSetFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ *
+ */
+public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {
+
+ @Nonnull
+ private final KeyGroupRange keyGroupRange;
+
+ @Nonnegative
+ private final int totalKeyGroups;
+
+ @Nonnegative
+ private final int minimumCapacity;
+
+ public HeapPriorityQueueSetFactory(
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int totalKeyGroups,
+ @Nonnegative int minimumCapacity) {
+
+ this.keyGroupRange = keyGroupRange;
+ this.totalKeyGroups = totalKeyGroups;
+ this.minimumCapacity = minimumCapacity;
+ }
+
+ @Nonnull
+ @Override
+ public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ @Nonnull String stateName,
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+ @Nonnull PriorityComparator<T> elementPriorityComparator,
+ @Nonnull KeyExtractorFunction<T> keyExtractor) {
+ return new HeapPriorityQueueSet<>(
+ elementPriorityComparator,
+ keyExtractor,
+ minimumCapacity,
+ keyGroupRange,
+ totalKeyGroups);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
index af4d54f..6f4f911 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
@@ -22,7 +22,10 @@ import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import javax.annotation.Nonnegative;
@@ -30,7 +33,10 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collection;
-import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
/**
* This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
@@ -41,7 +47,7 @@ import java.util.Comparator;
* @param <PQ> type type of sub-queue used for each key-group partition.
*/
public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
- implements InternalPriorityQueue<T> {
+ implements InternalPriorityQueue<T>, KeyGroupedInternalPriorityQueue<T> {
/** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
@Nonnull
@@ -66,7 +72,7 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu
@SuppressWarnings("unchecked")
public KeyGroupPartitionedPriorityQueue(
@Nonnull KeyExtractorFunction<T> keyExtractor,
- @Nonnull Comparator<T> elementComparator,
+ @Nonnull PriorityComparator<T> elementPriorityComparator,
@Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
@Nonnull KeyGroupRange keyGroupRange,
@Nonnegative int totalKeyGroups) {
@@ -76,16 +82,25 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu
this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
this.keyGroupedHeaps = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
this.heapOfkeyGroupedHeaps = new HeapPriorityQueue<>(
- new InternalPriorityQueueComparator<>(elementComparator),
+ new InternalPriorityQueueComparator<>(elementPriorityComparator),
keyGroupRange.getNumberOfKeyGroups());
for (int i = 0; i < keyGroupedHeaps.length; i++) {
final PQ keyGroupSubHeap =
- orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementComparator);
+ orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementPriorityComparator);
keyGroupedHeaps[i] = keyGroupSubHeap;
heapOfkeyGroupedHeaps.add(keyGroupSubHeap);
}
}
+ @Override
+ public void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> consumer) {
+ T element;
+ while ((element = peek()) != null && canConsume.test(element)) {
+ poll();
+ consumer.accept(element);
+ }
+ }
+
@Nullable
@Override
public T poll() {
@@ -173,9 +188,28 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu
private int computeKeyGroupIndex(T element) {
final Object extractKeyFromElement = keyExtractor.extractKeyFromElement(element);
final int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement, totalKeyGroups);
+ return globalKeyGroupToLocalIndex(keyGroupId);
+ }
+
+ private int globalKeyGroupToLocalIndex(int keyGroupId) {
return keyGroupId - firstKeyGroup;
}
+ @Nonnull
+ @Override
+ public Set<T> getSubsetForKeyGroup(int keyGroupId) {
+ HashSet<T> result = new HashSet<>();
+ PQ partitionQueue = keyGroupedHeaps[globalKeyGroupToLocalIndex(keyGroupId)];
+ try (CloseableIterator<T> iterator = partitionQueue.iterator()) {
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Exception while iterating key group.", e);
+ }
+ return result;
+ }
+
/**
* Iterator for {@link KeyGroupPartitionedPriorityQueue}. This iterator is not guaranteeing any order of elements.
* Using code must {@link #close()} after usage.
@@ -236,24 +270,24 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu
* @param <Q> type of queue.
*/
private static final class InternalPriorityQueueComparator<T, Q extends InternalPriorityQueue<T>>
- implements Comparator<Q> {
+ implements PriorityComparator<Q> {
/** Comparator for the queue elements, so we can compare their heads. */
@Nonnull
- private final Comparator<T> elementComparator;
+ private final PriorityComparator<T> elementPriorityComparator;
- InternalPriorityQueueComparator(@Nonnull Comparator<T> elementComparator) {
- this.elementComparator = elementComparator;
+ InternalPriorityQueueComparator(@Nonnull PriorityComparator<T> elementPriorityComparator) {
+ this.elementPriorityComparator = elementPriorityComparator;
}
@Override
- public int compare(Q o1, Q o2) {
+ public int comparePriority(Q o1, Q o2) {
final T left = o1.peek();
final T right = o2.peek();
if (left == null) {
return (right == null ? 0 : 1);
} else {
- return (right == null ? -1 : elementComparator.compare(left, right));
+ return (right == null ? -1 : elementPriorityComparator.comparePriority(left, right));
}
}
}
@@ -271,10 +305,13 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueu
*
* @param keyGroupId the key-group of the elements managed by the produced queue.
* @param numKeyGroups the total number of key-groups in the job.
- * @param elementComparator the comparator that determines the order of the managed elements.
+ * @param elementPriorityComparator the comparator that determines the order of managed elements by priority.
* @return a new queue for the given key-group.
*/
@Nonnull
- PQS create(@Nonnegative int keyGroupId, @Nonnegative int numKeyGroups, @Nonnull Comparator<T> elementComparator);
+ PQS create(
+ @Nonnegative int keyGroupId,
+ @Nonnegative int numKeyGroups,
+ @Nonnull PriorityComparator<T> elementPriorityComparator);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
index 0e7d9dd..14c281e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.heap;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
@@ -125,4 +126,10 @@ public class TreeOrderedSetCache<E> implements CachingInternalPriorityQueueSet.O
public E peekLast() {
return !avlTree.isEmpty() ? avlTree.last() : null;
}
+
+ @Nonnull
+ @Override
+ public CloseableIterator<E> orderedIterator() {
+ return CloseableIterator.adapterForIterator(avlTree.iterator());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 3da60e4..d78944c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.util.TernaryBoolean;
import javax.annotation.Nullable;
@@ -309,7 +310,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
TaskKvStateRegistry kvStateRegistry) {
TaskStateManager taskStateManager = env.getTaskStateManager();
-
+ HeapPriorityQueueSetFactory priorityQueueSetFactory =
+ new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
@@ -318,7 +320,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
keyGroupRange,
isUsingAsynchronousSnapshots(),
env.getExecutionConfig(),
- taskStateManager.createLocalRecoveryConfig());
+ taskStateManager.createLocalRecoveryConfig(),
+ priorityQueueSetFactory);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
index c0c3ba4..0cd551c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
@@ -51,8 +51,16 @@ public abstract class InternalPriorityQueueTestBase extends TestLogger {
protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
- protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
- Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+ protected static final PriorityComparator<TestElement> TEST_ELEMENT_PRIORITY_COMPARATOR =
+ (left, right) -> Long.compare(left.getPriority(), right.getPriority());
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR = (o1, o2) -> {
+ int priorityCmp = TEST_ELEMENT_PRIORITY_COMPARATOR.comparePriority(o1, o2);
+ if (priorityCmp != 0) {
+ return priorityCmp;
+ }
+ // to fully comply with compareTo/equals contract.
+ return Long.compare(o1.getKey(), o2.getKey());
+ };
protected static void insertRandomElements(
@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index 3c06b71..dfcdffc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -53,7 +53,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
new KeyGroupRange(0, 15),
true,
executionConfig,
- TestLocalRecoveryConfig.disabled());
+ TestLocalRecoveryConfig.disabled(),
+ mock(PriorityQueueSetFactory.class));
try {
Assert.assertTrue(
@@ -75,7 +76,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
new KeyGroupRange(0, 15),
true,
executionConfig,
- TestLocalRecoveryConfig.disabled());
+ TestLocalRecoveryConfig.disabled(),
+ mock(PriorityQueueSetFactory.class));
try {
Assert.assertTrue(
@@ -115,7 +117,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
new KeyGroupRange(0, 15),
true,
executionConfig,
- TestLocalRecoveryConfig.disabled());
+ TestLocalRecoveryConfig.disabled(),
+ mock(PriorityQueueSetFactory.class));
try {
@@ -156,7 +159,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
new KeyGroupRange(0, 15),
true,
executionConfig,
- TestLocalRecoveryConfig.disabled());
+ TestLocalRecoveryConfig.disabled(),
+ mock(PriorityQueueSetFactory.class));
try {
stateBackend.restore(StateObjectCollection.singleton(stateHandle));
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
index 618da4e..415497d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
@@ -25,7 +25,7 @@ public class HeapPriorityQueueSetTest extends HeapPriorityQueueTest {
@Override
protected HeapPriorityQueueSet<TestElement> newPriorityQueue(int initialCapacity) {
return new HeapPriorityQueueSet<>(
- TEST_ELEMENT_COMPARATOR,
+ TEST_ELEMENT_PRIORITY_COMPARATOR,
KEY_EXTRACTOR_FUNCTION,
initialCapacity,
KEY_GROUP_RANGE,
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
index 8ffb8b8..6ba5a68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
@@ -89,7 +89,7 @@ public class HeapPriorityQueueTest extends InternalPriorityQueueTestBase {
@Override
protected HeapPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
- return new HeapPriorityQueue<>(TEST_ELEMENT_COMPARATOR, initialCapacity);
+ return new HeapPriorityQueue<>(TEST_ELEMENT_PRIORITY_COMPARATOR, initialCapacity);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index bf428dc..cf6aef4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -49,14 +49,18 @@ public abstract class HeapStateBackendTestBase {
}
public <K> HeapKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
+ final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 15);
+ final int numKeyGroups = keyGroupRange.getNumberOfKeyGroups();
+
return new HeapKeyedStateBackend<>(
mock(TaskKvStateRegistry.class),
keySerializer,
HeapStateBackendTestBase.class.getClassLoader(),
- 16,
- new KeyGroupRange(0, 15),
+ numKeyGroups,
+ keyGroupRange,
async,
new ExecutionConfig(),
- TestLocalRecoveryConfig.disabled());
+ TestLocalRecoveryConfig.disabled(),
+ new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
index 277de19..d348e10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
@@ -29,7 +29,7 @@ public class KeyGroupPartitionedPriorityQueueTest extends InternalPriorityQueueT
protected InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
return new KeyGroupPartitionedPriorityQueue<>(
KEY_EXTRACTOR_FUNCTION,
- TEST_ELEMENT_COMPARATOR,
+ TEST_ELEMENT_PRIORITY_COMPARATOR,
newFactory(initialCapacity),
KEY_GROUP_RANGE, KEY_GROUP_RANGE.getNumberOfKeyGroups());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
new file mode 100644
index 0000000..ede45e3
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Configuration options for the RocksDB backend.
+ */
+public class RockDBBackendOptions {
+
+ /**
+ * Choice of implementation for priority queue state (e.g. timers).
+ */
+ public static final ConfigOption<String> PRIORITY_QUEUE_STATE_TYPE = ConfigOptions
+ .key("backend.rocksdb.priority_queue_state_type")
+ .defaultValue(RocksDBStateBackend.PriorityQueueStateType.HEAP.name())
+ .withDescription("This determines the implementation for the priority queue state (e.g. timers). Options are" +
+ "either " + RocksDBStateBackend.PriorityQueueStateType.HEAP.name() + " (heap-based, default) or " +
+ RocksDBStateBackend.PriorityQueueStateType.ROCKS.name() + " for in implementation based on RocksDB.");
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 21d2a65..f2430ae 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -58,14 +58,18 @@ import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotDirectory;
@@ -76,7 +80,13 @@ import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TieBreakingPriorityComparator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
+import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
@@ -243,6 +253,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** The snapshot strategy, e.g., if we use full or incremental checkpoints, local state, and so on. */
private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
+ /** Factory for priority queue state. */
+ private PriorityQueueSetFactory priorityQueueFactory;
+
public RocksDBKeyedStateBackend(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
@@ -255,7 +268,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig,
boolean enableIncrementalCheckpointing,
- LocalRecoveryConfig localRecoveryConfig
+ LocalRecoveryConfig localRecoveryConfig,
+ RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType
) throws IOException {
super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
@@ -296,6 +310,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.writeOptions = new WriteOptions().setDisableWAL(true);
+ switch (priorityQueueStateType) {
+ case HEAP:
+ this.priorityQueueFactory = new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
+ break;
+ case ROCKS:
+ this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory();
+ break;
+ default:
+ break;
+ }
+
LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
}
@@ -378,6 +403,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
IOUtils.closeQuietly(columnMetaData.f0);
}
+ // ... then close the priority queue related resources ...
+ if (priorityQueueFactory instanceof AutoCloseable) {
+ IOUtils.closeQuietly((AutoCloseable) priorityQueueFactory);
+ }
+
// ... and finally close the DB instance ...
IOUtils.closeQuietly(db);
@@ -394,6 +424,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
+ @Nonnull
+ @Override
+ public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ @Nonnull String stateName,
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+ @Nonnull PriorityComparator<T> elementComparator,
+ @Nonnull KeyExtractorFunction<T> keyExtractor) {
+
+ return priorityQueueFactory.create(stateName, byteOrderedElementSerializer, elementComparator, keyExtractor);
+ }
+
private void cleanInstanceBasePath() {
LOG.info("Deleting existing instance base directory {}.", instanceBasePath);
@@ -1290,7 +1331,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
namespaceSerializer,
stateDesc.getSerializer());
- ColumnFamilyHandle columnFamily = createColumnFamily(stateName);
+ ColumnFamilyHandle columnFamily = createColumnFamily(stateName, db);
stateInfo = Tuple2.of(columnFamily, newMetaInfo);
kvStateInformation.put(stateDesc.getName(), stateInfo);
@@ -1302,7 +1343,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Creates a column family handle for use with a k/v state.
*/
- private ColumnFamilyHandle createColumnFamily(String stateName) throws IOException {
+ private ColumnFamilyHandle createColumnFamily(String stateName, RocksDB db) {
byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
"The chosen state name 'default' collides with the name of the default column family!");
@@ -1312,7 +1353,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
return db.createColumnFamily(columnDescriptor);
} catch (RocksDBException e) {
- throw new IOException("Error creating ColumnFamilyHandle.", e);
+ throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e);
}
}
@@ -2579,4 +2620,126 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ReadOptions readOptions) {
return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
}
+
+ /**
+ * Encapsulates the logic and resources in connection with creating priority queue state structures.
+ */
+ class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory, AutoCloseable {
+
+ /** Default cache size per key-group. */
+ private static final int DEFAULT_CACHES_SIZE = 8 * 1024;
+
+ /** A shared buffer to serialize elements for the priority queue. */
+ @Nonnull
+ private final ByteArrayOutputStreamWithPos elementSerializationOutStream;
+
+ /** A shared adapter wrapper around elementSerializationOutStream to become a {@link DataOutputView}. */
+ @Nonnull
+ private final DataOutputViewStreamWrapper elementSerializationOutView;
+
+ /** A shared {@link RocksDBWriteBatchWrapper} to batch modifications to priority queues. */
+ @Nonnull
+ private final RocksDBWriteBatchWrapper writeBatchWrapper;
+
+ /** Map to track all column families created to back priority queues. */
+ @Nonnull
+ private final Map<String, ColumnFamilyHandle> priorityQueueColumnFamilies;
+
+ /** The mandatory default column family, so that we can close it later. */
+ @Nonnull
+ private final ColumnFamilyHandle defaultColumnFamily;
+
+ /** Path of the RocksDB instance that holds the priority queues. */
+ @Nonnull
+ private final File pqInstanceRocksDBPath;
+
+ /** RocksDB instance that holds the priority queues. */
+ @Nonnull
+ private final RocksDB pqDb;
+
+ RocksDBPriorityQueueSetFactory() throws IOException {
+ this.pqInstanceRocksDBPath = new File(instanceBasePath, "pqdb");
+ if (pqInstanceRocksDBPath.exists()) {
+ try {
+ FileUtils.deleteDirectory(pqInstanceRocksDBPath);
+ } catch (IOException ex) {
+ LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex);
+ }
+ }
+ List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
+ this.pqDb = openDB(pqInstanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
+ this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
+ this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream);
+ this.writeBatchWrapper = new RocksDBWriteBatchWrapper(pqDb, writeOptions);
+ this.defaultColumnFamily = columnFamilyHandles.get(0);
+ this.priorityQueueColumnFamilies = new HashMap<>();
+ }
+
+ @Nonnull
+ @Override
+ public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
+ @Nonnull String stateName,
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+ @Nonnull PriorityComparator<T> elementPriorityComparator,
+ @Nonnull KeyExtractorFunction<T> keyExtractor) {
+
+ final ColumnFamilyHandle columnFamilyHandle =
+ priorityQueueColumnFamilies.computeIfAbsent(
+ stateName,
+ (name) -> RocksDBKeyedStateBackend.this.createColumnFamily(name, pqDb));
+
+ @Nonnull
+ TieBreakingPriorityComparator<T> tieBreakingComparator =
+ new TieBreakingPriorityComparator<>(
+ elementPriorityComparator,
+ byteOrderedElementSerializer,
+ elementSerializationOutStream,
+ elementSerializationOutView);
+
+ return new KeyGroupPartitionedPriorityQueue<>(
+ keyExtractor,
+ elementPriorityComparator,
+ new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, CachingInternalPriorityQueueSet<T>>() {
+ @Nonnull
+ @Override
+ public CachingInternalPriorityQueueSet<T> create(
+ int keyGroupId,
+ int numKeyGroups,
+ @Nonnull PriorityComparator<T> elementPriorityComparator) {
+
+ CachingInternalPriorityQueueSet.OrderedSetCache<T> cache =
+ new TreeOrderedSetCache<>(tieBreakingComparator, DEFAULT_CACHES_SIZE);
+ CachingInternalPriorityQueueSet.OrderedSetStore<T> store =
+ new RocksDBOrderedSetStore<>(
+ keyGroupId,
+ keyGroupPrefixBytes,
+ pqDb,
+ columnFamilyHandle,
+ byteOrderedElementSerializer,
+ elementSerializationOutStream,
+ elementSerializationOutView,
+ writeBatchWrapper);
+
+ return new CachingInternalPriorityQueueSet<>(cache, store);
+ }
+ },
+ keyGroupRange,
+ numberOfKeyGroups);
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(writeBatchWrapper);
+ for (ColumnFamilyHandle columnFamilyHandle : priorityQueueColumnFamilies.values()) {
+ IOUtils.closeQuietly(columnFamilyHandle);
+ }
+ IOUtils.closeQuietly(defaultColumnFamily);
+ IOUtils.closeQuietly(pqDb);
+ try {
+ FileUtils.deleteDirectory(pqInstanceRocksDBPath);
+ } catch (IOException ex) {
+ LOG.warn("Could not delete instance path for PQ RocksDB: " + pqInstanceRocksDBPath, ex);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
index e512933..5284314 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.FlinkRuntimeException;
import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -61,10 +60,6 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe
@Nonnull
private final ColumnFamilyHandle columnFamilyHandle;
- /** Read options for RocksDB. */
- @Nonnull
- private final ReadOptions readOptions;
-
/**
* Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
* aligned with their logical order.
@@ -93,14 +88,12 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe
@Nonnegative int keyGroupPrefixBytes,
@Nonnull RocksDB db,
@Nonnull ColumnFamilyHandle columnFamilyHandle,
- @Nonnull ReadOptions readOptions,
@Nonnull TypeSerializer<T> byteOrderProducingSerializer,
@Nonnull ByteArrayOutputStreamWithPos outputStream,
@Nonnull DataOutputViewStreamWrapper outputView,
@Nonnull RocksDBWriteBatchWrapper batchWrapper) {
this.db = db;
this.columnFamilyHandle = columnFamilyHandle;
- this.readOptions = readOptions;
this.byteOrderProducingSerializer = byteOrderProducingSerializer;
this.outputStream = outputStream;
this.outputView = outputView;
@@ -169,7 +162,7 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe
return new RocksToJavaIteratorAdapter(
new RocksIteratorWrapper(
- db.newIterator(columnFamilyHandle, readOptions)));
+ db.newIterator(columnFamilyHandle)));
}
/**
@@ -232,6 +225,10 @@ public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSe
private RocksToJavaIteratorAdapter(@Nonnull RocksIteratorWrapper iterator) {
this.iterator = iterator;
try {
+ // TODO we could check if it is more efficient to make the seek more specific, e.g. with a provided hint
+ // that is lexicographically closer the first expected element in the key-group. I wonder if this could
+ // help to improve the seek if there are many tombstones for elements at the beginning of the key-group
+ // (like for elements that have been removed in previous polling, before they are compacted away).
iterator.seek(groupPrefixBytes);
deserializeNextElementIfAvailable();
} catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 81d6265..998521b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -59,6 +59,7 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
+import static org.apache.flink.contrib.streaming.state.RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -76,6 +77,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class RocksDBStateBackend extends AbstractStateBackend implements ConfigurableStateBackend {
+ /**
+ * The options to chose for the type of priority queue state.
+ */
+ public enum PriorityQueueStateType {
+ HEAP,
+ ROCKS
+ }
+
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class);
@@ -109,6 +118,9 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
/** This determines if incremental checkpointing is enabled. */
private final TernaryBoolean enableIncrementalCheckpointing;
+ /** This determines the type of priority queue state. */
+ private final PriorityQueueStateType priorityQueueStateType;
+
// -- runtime values, set on TaskManager when initializing / using the backend
/** Base paths for RocksDB directory, as initialized. */
@@ -221,6 +233,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) {
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
+ // for now, we use still the heap-based implementation as default
+ this.priorityQueueStateType = PriorityQueueStateType.HEAP;
}
/**
@@ -256,6 +270,11 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
+ final String priorityQueueTypeString = config.getString(PRIORITY_QUEUE_STATE_TYPE.key(), "");
+
+ this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
+ PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
+
// configure local directories
if (original.localRocksDbDirectories != null) {
this.localRocksDbDirectories = original.localRocksDbDirectories;
@@ -422,7 +441,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
keyGroupRange,
env.getExecutionConfig(),
isIncrementalCheckpointsEnabled(),
- localRecoveryConfig);
+ localRecoveryConfig,
+ priorityQueueStateType);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
index ae20cf2..5f26835 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
@@ -57,7 +57,6 @@ public class CachingInternalPriorityQueueSetWithRocksDBStoreTest extends Caching
prefixBytes,
rocksDBResource.getRocksDB(),
rocksDBResource.getDefaultColumnFamily(),
- rocksDBResource.getReadOptions(),
TestElementSerializer.INSTANCE,
outputStream,
outputView,
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
index 256a83b..0b1d07b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
@@ -124,7 +124,6 @@ public class RocksDBOrderedSetStoreTest {
keyGroupPrefixBytes,
rocksDBResource.getRocksDB(),
rocksDBResource.getDefaultColumnFamily(),
- rocksDBResource.getReadOptions(),
byteOrderSerializer,
outputStreamWithPos,
outputView,
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index ad89583..69069d6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -240,7 +240,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
new KeyGroupRange(0, 0),
new ExecutionConfig(),
enableIncrementalCheckpointing,
- TestLocalRecoveryConfig.disabled());
+ TestLocalRecoveryConfig.disabled(),
+ RocksDBStateBackend.PriorityQueueStateType.HEAP);
verify(columnFamilyOptions, Mockito.times(1))
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 9915dd5..797a26a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -730,9 +730,11 @@ public abstract class AbstractStreamOperator<OUT>
checkTimerServiceInitialization();
// the following casting is to overcome type restrictions.
- TypeSerializer<K> keySerializer = (TypeSerializer<K>) getKeyedStateBackend().getKeySerializer();
+ KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
+ TypeSerializer<K> keySerializer = keyedStateBackend.getKeySerializer();
InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager;
- return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable);
+ TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer);
+ return keyedTimeServiceHandler.getInternalTimerService(name, timerSerializer, triggerable);
}
public void processWatermark(Watermark mark) throws Exception {