You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/06/29 08:54:47 UTC
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/6228
[FLINK-9491] Implement timer data structure based on RocksDB
## What is the purpose of the change
This PR is another step towards integrating the timer state with the keyed state backends.
First, the PR generalizes the data structure `InternalTimerHeap` to `InternalPriorityQueue` so that the functionality of a heap-set-organized state is decoupled from storing timers. The main reason for this is that state/backend related code lives in flink-runtime and timers are a concept from flink-streaming.
Second, the PR also introduced an implementation of `InternalPriorityQueue` with set semantics (i.e. the data structure we require to manage timers) that is based on RocksDB. State in RocksDB is always partitioned into key-groups, so the general idea is to organize the implementation as a heap-of-heaps, where each sub-heap represents elements from exactly one key-group, that merges by priority over the key-group boundaries. The implementation reuses the in-memory implementation of `InternalPriorityQueue` (without set-properties) as the super-heap that holds the sub-heaps. Further more each sub-heap is an instance of `CachingInternalPriorityQueueSet`, consisting of a "fast", "small" cache (`OrderedSetCache`) and a "slow", "unbounded" store (`OrderedSetStore`), currently applying simple write-through synchronization between cache and store. In the current implementation, the cache is based on a an AVL-Tree and restricted in capacity. The store is backed by a RocksDB column family.
We utilize caching to reduced read-accesses to RocksDB.
Please note that the RocksDB implementation is currently not yet integrated with the timer service or the backend. This will happen in the next steps.
## Brief change log
- Refactored `InternalTimerHeap` to decouple it from timers, moved the data structures from flink-streaming to flink-runtime (-> `InternalPriorityQueue`).
- Split the data-structure into a hierarchy, a heap without set-semantics (`HeapPriorityQueue`) and a heap extended with set-semantics (`HeapPriorityQueueSet`).
- Introduced an implementation of RocksDB-based `InternalPriorityQueue` with set-semantics. Starting point is `KeyGroupPartitionedPriorityQueue`. This class uses a `HeapPriorityQueue` of `CachingInternalPriorityQueueSet` elements that each contains elements for exactly one key-group (heap-of-heaps). For RocksDB, we configure each `CachingInternalPriorityQueueSet` to use a `TreeOrderedSetCache` and a `RocksDBOrderedStore`.
## Verifying this change
I added dedicated tests for all data structures.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes, fastutil)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (yes)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink heapAbstractionsRocks
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6228.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6228
----
commit b3261a15bdf15207f50e2832a048fb3c84b8f642
Author: Stefan Richter <s....@...>
Date: 2018-06-19T08:01:30Z
Introduce MAX_ARRAY_SIZE as general constant
commit dd64cbb7eb15317a4dc8f0626c50dffd58e6b5f9
Author: Stefan Richter <s....@...>
Date: 2018-06-18T12:38:01Z
Generalization of timer queue to a queue(set) that is no longer coupled to timers and implementation for RocksDB
----
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199816139
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+/**
+ * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
+ * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
+ * semantics.
+ *
+ * @param <T> the type of elements in the queue.
+ * @param <PQ> type type of sub-queue used for each key-group partition.
+ */
+public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+ implements InternalPriorityQueue<T> {
+
+ /** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
+ @Nonnull
+ private final HeapPriorityQueue<PQ> keyGroupHeap;
+
+ /** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
+ @Nonnull
+ private final PQ[] keyGroupLists;
+
+ /** Function to extract the key from contained elements. */
+ @Nonnull
+ private final KeyExtractorFunction<T> keyExtractor;
+
+ /** The total number of key-groups (in the job). */
+ @Nonnegative
+ private final int totalKeyGroups;
+
+ /** The smallest key-group id with a subpartition managed by this ordered set. */
+ @Nonnegative
+ private final int firstKeyGroup;
+
+ @SuppressWarnings("unchecked")
+ public KeyGroupPartitionedPriorityQueue(
+ @Nonnull KeyExtractorFunction<T> keyExtractor,
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int totalKeyGroups) {
+
+ this.keyExtractor = keyExtractor;
+ this.totalKeyGroups = totalKeyGroups;
+ this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
+ this.keyGroupLists = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
+ this.keyGroupHeap = new HeapPriorityQueue<>(
+ new InternalPriorityQueueComparator<>(elementComparator),
+ keyGroupRange.getNumberOfKeyGroups());
+ for (int i = 0; i < keyGroupLists.length; i++) {
+ final PQ keyGroupCache =
+ orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementComparator);
+ keyGroupLists[i] = keyGroupCache;
+ keyGroupHeap.add(keyGroupCache);
+ }
+ }
+
+ @Nullable
+ @Override
+ public T poll() {
+ final PQ headList = keyGroupHeap.peek();
+ final T head = headList.poll();
+ keyGroupHeap.adjustModifiedElement(headList);
+ return head;
+ }
+
+ @Nullable
+ @Override
+ public T peek() {
+ return keyGroupHeap.peek().peek();
+ }
+
+ @Override
+ public boolean add(@Nonnull T toAdd) {
+ final PQ list = getListForElementKeyGroup(toAdd);
+
+ // the branch checks if the head element has (potentially) changed.
+ if (list.add(toAdd)) {
+ keyGroupHeap.adjustModifiedElement(list);
+ // could we have a new head?
+ return toAdd.equals(peek());
+ } else {
+ // head unchanged
+ return false;
+ }
+ }
+
+ @Override
+ public boolean remove(@Nonnull T toRemove) {
+ final PQ list = getListForElementKeyGroup(toRemove);
+
+ final T oldHead = peek();
+
+ // the branch checks if the head element has (potentially) changed.
+ if (list.remove(toRemove)) {
+ keyGroupHeap.adjustModifiedElement(list);
+ // could we have a new head?
+ return toRemove.equals(oldHead);
+ } else {
+ // head unchanged
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return peek() == null;
+ }
+
+ @Override
+ public int size() {
+ int sizeSum = 0;
+ for (PQ list : keyGroupLists) {
+ sizeSum += list.size();
+ }
+ return sizeSum;
+ }
+
+ @Override
+ public void addAll(@Nullable Collection<? extends T> toAdd) {
+
+ if (toAdd == null) {
+ return;
+ }
+
+ // TODO consider bulk loading the partitions and "heapify" keyGroupHeap once after all elements are inserted.
+ for (T element : toAdd) {
+ add(element);
+ }
+ }
+
+ @Nonnull
+ @Override
+ public CloseableIterator<T> iterator() {
+ return new KeyGroupConcatenationIterator<>(keyGroupLists);
+ }
+
+ private PQ getListForElementKeyGroup(T element) {
+ return keyGroupLists[computeKeyGroupIndex(element)];
+ }
+
+ private int computeKeyGroupIndex(T element) {
+ final Object extractKeyFromElement = keyExtractor.extractKeyFromElement(element);
+ final int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement, totalKeyGroups);
+ return keyGroupId - firstKeyGroup;
+ }
+
+ /**
+ * Iterator for {@link KeyGroupPartitionedPriorityQueue}. This iterator is not guaranteeing any order of elements.
+ * Using code must {@link #close()} after usage.
+ *
+ * @param <T> the type of iterated elements.
+ */
+ private static final class KeyGroupConcatenationIterator<
+ T, PQS extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+ implements CloseableIterator<T> {
+
+ /** Array with the subpartitions that we iterate. No null values in the array. */
+ @Nonnull
+ private final PQS[] keyGroupLists;
+
+ /** The subpartition the is currently iterated. */
+ @Nonnegative
+ private int index;
+
+ /** The iterator of the current subpartition. */
+ @Nonnull
+ private CloseableIterator<T> current;
+
+ private KeyGroupConcatenationIterator(@Nonnull PQS[] keyGroupLists) {
+ this.keyGroupLists = keyGroupLists;
+ this.index = 0;
+ this.current = CloseableIterator.empty();
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean currentHasNext = current.hasNext();
+
+ // find the iterator of the next partition that has elements.
+ while (!currentHasNext && index < keyGroupLists.length) {
+ IOUtils.closeQuietly(current);
+ current = keyGroupLists[index++].iterator();
+ currentHasNext = current.hasNext();
+ }
+ return currentHasNext;
+ }
+
+ @Override
+ public T next() {
+ return current.next();
+ }
+
+ @Override
+ public void close() throws Exception {
+ current.close();
+ }
+ }
+
+ /**
+ * Comparator that compares {@link InternalPriorityQueue} objects by their head element. Must handle null results
+ * from {@link #peek()}.
+ *
+ * @param <T> type of the elements in the compared queues.
+ * @param <Q> type of queue.
+ */
+ private static final class InternalPriorityQueueComparator<T, Q extends InternalPriorityQueue<T>>
+ implements Comparator<Q> {
+
+ /** Comparator for the queue elements, so we can compare their heads. */
+ @Nonnull
+ private final Comparator<T> elementComparator;
+
+ InternalPriorityQueueComparator(@Nonnull Comparator<T> elementComparator) {
+ this.elementComparator = elementComparator;
+ }
+
+ @Override
+ public int compare(Q o1, Q o2) {
+ final T leftTimer = o1.peek();
+ final T rightTimer = o2.peek();
--- End diff --
👍
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6228
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199816552
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
--- End diff --
👍
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199812541
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Function to extract a key from a given object.
+ *
+ * @param <T> type of the element from which we extract the key.
+ */
+@FunctionalInterface
+public interface KeyExtractorFunction<T> {
--- End diff --
I find it useful when concepts have names attached to it and some form of typing otherwise, you end up with a lot of `Function<>` objects and have to think twice about their concrete use-case.
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199325172
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+/**
+ * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
+ * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
+ * semantics.
+ *
+ * @param <T> the type of elements in the queue.
+ * @param <PQ> type type of sub-queue used for each key-group partition.
+ */
+public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+ implements InternalPriorityQueue<T> {
+
+ /** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
+ @Nonnull
+ private final HeapPriorityQueue<PQ> keyGroupHeap;
+
+ /** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
+ @Nonnull
+ private final PQ[] keyGroupLists;
+
+ /** Function to extract the key from contained elements. */
+ @Nonnull
+ private final KeyExtractorFunction<T> keyExtractor;
+
+ /** The total number of key-groups (in the job). */
+ @Nonnegative
+ private final int totalKeyGroups;
+
+ /** The smallest key-group id with a subpartition managed by this ordered set. */
+ @Nonnegative
+ private final int firstKeyGroup;
+
+ @SuppressWarnings("unchecked")
+ public KeyGroupPartitionedPriorityQueue(
+ @Nonnull KeyExtractorFunction<T> keyExtractor,
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int totalKeyGroups) {
+
+ this.keyExtractor = keyExtractor;
+ this.totalKeyGroups = totalKeyGroups;
+ this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
+ this.keyGroupLists = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
+ this.keyGroupHeap = new HeapPriorityQueue<>(
+ new InternalPriorityQueueComparator<>(elementComparator),
+ keyGroupRange.getNumberOfKeyGroups());
+ for (int i = 0; i < keyGroupLists.length; i++) {
+ final PQ keyGroupCache =
+ orderedCacheFactory.create(firstKeyGroup + i, elementComparator);
+ keyGroupLists[i] = keyGroupCache;
+ keyGroupHeap.add(keyGroupCache);
+ }
+ }
+
+ @Nullable
+ @Override
+ public T poll() {
+ final PQ headList = keyGroupHeap.peek();
+ final T head = headList.poll();
+ keyGroupHeap.adjustModifiedElement(headList);
+ return head;
+ }
+
+ @Nullable
+ @Override
+ public T peek() {
+ return keyGroupHeap.peek().peek();
+ }
+
+ @Override
+ public boolean add(@Nonnull T toAdd) {
+ final PQ list = getListForElementKeyGroup(toAdd);
+
+ // the branch checks if the head element has (potentially) changed.
+ if (list.add(toAdd)) {
+ keyGroupHeap.adjustModifiedElement(list);
+ // could we have a new head?
+ return toAdd.equals(peek());
+ } else {
+ // head unchanged
+ return false;
+ }
+ }
+
+ @Override
+ public boolean remove(@Nonnull T toRemove) {
+ final PQ list = getListForElementKeyGroup(toRemove);
+
+ final T oldHead = peek();
+
+ // the branch checks if the head element has (potentially) changed.
+ if (list.remove(toRemove)) {
+ keyGroupHeap.adjustModifiedElement(list);
+ // could we have a new head?
+ return toRemove.equals(oldHead);
+ } else {
+ // head unchanged
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return peek() == null;
+ }
+
+ @Override
+ public int size() {
+ int sizeSum = 0;
+ for (PQ list : keyGroupLists) {
+ sizeSum += list.size();
+ }
+ return sizeSum;
+ }
+
+ @Override
+ public void addAll(@Nullable Collection<? extends T> toAdd) {
+
+ if (toAdd == null) {
+ return;
+ }
+
+ // TODO consider bulk loading the partitions and "heapify" keyGroupHeap once after all elements are inserted.
+ for (T element : toAdd) {
+ add(element);
+ }
+ }
+
+ @Nonnull
+ @Override
+ public CloseableIterator<T> iterator() {
+ return new KeyGroupConcatenationIterator<>(keyGroupLists);
+ }
+
+ private PQ getListForElementKeyGroup(T element) {
+ return keyGroupLists[computeKeyGroupIndex(element)];
+ }
+
+ private int computeKeyGroupIndex(T element) {
+ final Object extractKeyFromElement = keyExtractor.extractKeyFromElement(element);
+ final int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement, totalKeyGroups);
+ return keyGroupId - firstKeyGroup;
+ }
+
+ /**
+ * Iterator for {@link KeyGroupPartitionedPriorityQueue}. This iterator is not guaranteeing any order of elements.
+ * Using code must {@link #close()} after usage.
+ *
+ * @param <T> the type of iterated elements.
+ */
+ private static final class KeyGroupConcatenationIterator<
+ T, PQS extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+ implements CloseableIterator<T> {
+
+ /** Array with the subpartitions that we iterate. No null values in the array. */
+ @Nonnull
+ private final PQS[] keyGroupLists;
+
+ /** The subpartition the is currently iterated. */
+ @Nonnegative
+ private int index;
+
+ /** The iterator of the current subpartition. */
+ @Nonnull
+ private CloseableIterator<T> current;
+
+ private KeyGroupConcatenationIterator(@Nonnull PQS[] keyGroupLists) {
+ this.keyGroupLists = keyGroupLists;
+ this.index = 0;
+ this.current = CloseableIterator.empty();
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean currentHasNext = current.hasNext();
+
+ // find the iterator of the next partition that has elements.
+ while (!currentHasNext && index < keyGroupLists.length) {
+ IOUtils.closeQuietly(current);
+ current = keyGroupLists[index++].iterator();
+ currentHasNext = current.hasNext();
+ }
+ return currentHasNext;
+ }
+
+ @Override
+ public T next() {
+ return current.next();
+ }
+
+ @Override
+ public void close() throws Exception {
+ current.close();
+ }
+ }
+
+ /**
+ * Comparator that compares {@link InternalPriorityQueue} objects by their head element. Must handle null results
+ * from {@link #peek()}.
+ *
+ * @param <T> type of the elements in the compared queues.
+ * @param <Q> type of queue.
+ */
+ private static final class InternalPriorityQueueComparator<T, Q extends InternalPriorityQueue<T>>
+ implements Comparator<Q> {
+
+ /** Comparator for the queue elements, so we can compare their heads. */
+ private final Comparator<T> elementComparator;
--- End diff --
elementComparator is `@Nonnull`
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199814790
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue with set semantics, based on {@link HeapPriorityQueue}. The heap is supported by hash
+ * set for fast contains (de-duplication) and deletes. Object identification happens based on {@link #equals(Object)}.
+ *
+ * <p>Possible future improvements:
+ * <ul>
+ * <li>We could also implement shrinking for the heap and the deduplication set.</li>
+ * <li>We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful adding, etc..</li>
+ * </ul>
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends HeapPriorityQueue<T> {
+
+ /**
+ * Function to extract the key from contained elements.
+ */
+ private final KeyExtractorFunction<T> keyExtractor;
+
+ /**
+ * This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of elements.
+ */
+ private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
+
+ /**
+ * The key-group range of elements that are managed by this queue.
+ */
+ private final KeyGroupRange keyGroupRange;
+
+ /**
+ * The total number of key-groups of the job.
+ */
+ private final int totalNumberOfKeyGroups;
+
+ /**
+ * Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
+ *
+ * @param elementComparator comparator for the contained elements.
+ * @param keyExtractor function to extract a key from the contained elements.
+ * @param minimumCapacity the minimum and initial capacity of this priority queue.
+ * @param keyGroupRange the key-group range of the elements in this set.
+ * @param totalNumberOfKeyGroups the total number of key-groups of the job.
+ */
+ @SuppressWarnings("unchecked")
+ public HeapPriorityQueueSet(
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnull KeyExtractorFunction<T> keyExtractor,
+ @Nonnegative int minimumCapacity,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int totalNumberOfKeyGroups) {
+
+ super(elementComparator, minimumCapacity);
+
+ this.keyExtractor = keyExtractor;
+
+ this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+ this.keyGroupRange = keyGroupRange;
+
+ final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups();
+ final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange;
+ this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange];
+ for (int i = 0; i < keyGroupsInLocalRange; ++i) {
+ deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize);
+ }
+ }
+
+ @Override
+ @Nullable
+ public T poll() {
+ final T toRemove = super.poll();
+ if (toRemove != null) {
+ return getDedupMapForElement(toRemove).remove(toRemove);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Adds the element to the queue. In contrast to the superclass and to maintain set semantics, this happens only if
+ * no such element is already contained (determined by {@link #equals(Object)}).
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean add(@Nonnull T element) {
+ return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
+ }
+
+ /**
+ * In contrast to the superclass and to maintain set semantics, removal here is based on comparing the given element
+ * via {@link #equals(Object)}.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean remove(@Nonnull T elementToRemove) {
+ T storedElement = getDedupMapForElement(elementToRemove).remove(elementToRemove);
+ return storedElement != null && super.remove(storedElement);
+ }
+
+ @Override
+ public void clear() {
+ super.clear();
+ for (HashMap<?, ?> elementHashMap :
+ deduplicationMapsByKeyGroup) {
--- End diff --
👍
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199567205
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue with set semantics, based on {@link HeapPriorityQueue}. The heap is supported by hash
+ * set for fast contains (de-duplication) and deletes. Object identification happens based on {@link #equals(Object)}.
+ *
+ * <p>Possible future improvements:
+ * <ul>
+ * <li>We could also implement shrinking for the heap and the deduplication set.</li>
+ * <li>We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful adding, etc..</li>
+ * </ul>
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends HeapPriorityQueue<T> {
+
+ /**
+ * Function to extract the key from contained elements.
+ */
+ private final KeyExtractorFunction<T> keyExtractor;
+
+ /**
+ * This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of elements.
+ */
+ private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
+
+ /**
+ * The key-group range of elements that are managed by this queue.
+ */
+ private final KeyGroupRange keyGroupRange;
+
+ /**
+ * The total number of key-groups of the job.
+ */
+ private final int totalNumberOfKeyGroups;
+
+ /**
+ * Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
+ *
+ * @param elementComparator comparator for the contained elements.
+ * @param keyExtractor function to extract a key from the contained elements.
+ * @param minimumCapacity the minimum and initial capacity of this priority queue.
+ * @param keyGroupRange the key-group range of the elements in this set.
+ * @param totalNumberOfKeyGroups the total number of key-groups of the job.
+ */
+ @SuppressWarnings("unchecked")
+ public HeapPriorityQueueSet(
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnull KeyExtractorFunction<T> keyExtractor,
+ @Nonnegative int minimumCapacity,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int totalNumberOfKeyGroups) {
+
+ super(elementComparator, minimumCapacity);
+
+ this.keyExtractor = keyExtractor;
+
+ this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+ this.keyGroupRange = keyGroupRange;
+
+ final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups();
+ final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange;
+ this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange];
+ for (int i = 0; i < keyGroupsInLocalRange; ++i) {
+ deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize);
+ }
+ }
+
+ @Override
+ @Nullable
+ public T poll() {
+ final T toRemove = super.poll();
+ if (toRemove != null) {
+ return getDedupMapForElement(toRemove).remove(toRemove);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Adds the element to the queue. In contrast to the superclass and to maintain set semantics, this happens only if
+ * no such element is already contained (determined by {@link #equals(Object)}).
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean add(@Nonnull T element) {
+ return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
+ }
+
+ /**
+ * In contrast to the superclass and to maintain set semantics, removal here is based on comparing the given element
+ * via {@link #equals(Object)}.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean remove(@Nonnull T elementToRemove) {
+ T storedElement = getDedupMapForElement(elementToRemove).remove(elementToRemove);
+ return storedElement != null && super.remove(storedElement);
+ }
+
+ @Override
+ public void clear() {
+ super.clear();
+ for (HashMap<?, ?> elementHashMap :
+ deduplicationMapsByKeyGroup) {
--- End diff --
Accidental newline?
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199813559
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
+ * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
+ * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
+ * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
+ * not on equals.
+ *
+ * <p>Possible future improvements:
+ * <ul>
+ * <li>We could also implement shrinking for the heap.</li>
+ * </ul>
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
+
+ /**
+ * The index of the head element in the array that represents the heap.
+ */
+ private static final int QUEUE_HEAD_INDEX = 1;
+
+ /**
+ * Comparator for the contained elements.
+ */
+ private final Comparator<T> elementComparator;
+
+ /**
+ * The array that represents the heap-organized priority queue.
+ */
+ private T[] queue;
+
+ /**
+ * The current size of the priority queue.
+ */
+ private int size;
+
+ /**
+ * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
+ *
+ * @param elementComparator comparator for the contained elements.
+ * @param minimumCapacity the minimum and initial capacity of this priority queue.
+ */
+ @SuppressWarnings("unchecked")
+ public HeapPriorityQueue(
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnegative int minimumCapacity) {
+
+ this.elementComparator = elementComparator;
+ this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
+ }
+
+ @Override
+ @Nullable
+ public T poll() {
+ return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
+ }
+
+ @Override
+ @Nullable
+ public T peek() {
+ return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
+ }
+
+ /**
+ * Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean add(@Nonnull T toAdd) {
+ return addInternal(toAdd);
+ }
+
+ /**
+ * This remove is based on object identity, not the result of equals.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean remove(@Nonnull T toStop) {
--- End diff --
👍
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199806702
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
--- End diff --
Could be useful to insert random add operation while emptying the queue, one new test or in relevant tests.
It would check that mixed-in additions do not break the order.
e.g. every 10 operations if size is greater than 10 to ensure convergence to empty state
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199806942
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
+ @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+ @Nonnull Set<TestElement> checkSet,
+ int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority = localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated = priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ Assert.assertTrue(headChangedIndicated);
+ }
+ }
+ Assert.assertEquals(count, priorityQueue.size());
+ }
+
+ @Test
+ public void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ int lastSize = priorityQueue.size();
+ Assert.assertEquals(testSize, lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ Assert.assertFalse(priorityQueue.isEmpty());
+ Assert.assertEquals(lastSize, priorityQueue.size());
+ Assert.assertEquals(testElement, priorityQueue.poll());
+ Assert.assertTrue(checkSet.remove(testElement));
+ Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+ Assert.assertEquals(0, priorityQueue.size());
+ Assert.assertEquals(0, checkSet.size());
+ }
+
+ @Test
+ public void testStopInsertMixKeepsOrder() {
+
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ final int testSize = 128;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ // check that the whole set is still in order
+ while (!checkSet.isEmpty()) {
+
+ Iterator<TestElement> iterator = checkSet.iterator();
+ TestElement element = iterator.next();
+ iterator.remove();
+
+ boolean removesHead = element.equals(priorityQueue.peek());
+ if (removesHead) {
+ Assert.assertTrue(priorityQueue.remove(element));
+ } else {
+ priorityQueue.remove(element);
+ }
+ Assert.assertEquals(checkSet.size(), priorityQueue.size());
+
+ long lastPriorityValue = Long.MIN_VALUE;
+
+ while ((element = priorityQueue.poll()) != null) {
+ Assert.assertTrue(element.getPriority() >= lastPriorityValue);
+ lastPriorityValue = element.getPriority();
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ priorityQueue.addAll(checkSet);
+ }
+ }
+
+ @Test
+ public void testPoll() {
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ Assert.assertNull(priorityQueue.poll());
+
+ final int testSize = 345;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ while (!priorityQueue.isEmpty()) {
+ TestElement removed = priorityQueue.poll();
+ Assert.assertNotNull(removed);
+ Assert.assertTrue(checkSet.remove(removed));
+ Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
+ lastPriorityValue = removed.getPriority();
+ }
+ Assert.assertTrue(checkSet.isEmpty());
+
+ Assert.assertNull(priorityQueue.poll());
+ }
+
+ @Test
+ public void testIsEmpty() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
+ Assert.assertFalse(priorityQueue.isEmpty());
+
+ priorityQueue.poll();
+ Assert.assertTrue(priorityQueue.isEmpty());
+ }
+
+ @Test
+ public void testBulkAddRestoredTimers() throws Exception {
--- End diff --
testBulkAddRestored**Elements**
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199806903
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
+ @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+ @Nonnull Set<TestElement> checkSet,
+ int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority = localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated = priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ Assert.assertTrue(headChangedIndicated);
+ }
+ }
+ Assert.assertEquals(count, priorityQueue.size());
+ }
+
+ @Test
+ public void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ int lastSize = priorityQueue.size();
+ Assert.assertEquals(testSize, lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ Assert.assertFalse(priorityQueue.isEmpty());
+ Assert.assertEquals(lastSize, priorityQueue.size());
+ Assert.assertEquals(testElement, priorityQueue.poll());
+ Assert.assertTrue(checkSet.remove(testElement));
+ Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+ Assert.assertEquals(0, priorityQueue.size());
+ Assert.assertEquals(0, checkSet.size());
+ }
+
+ @Test
+ public void testStopInsertMixKeepsOrder() {
+
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ final int testSize = 128;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ // check that the whole set is still in order
+ while (!checkSet.isEmpty()) {
+
+ Iterator<TestElement> iterator = checkSet.iterator();
+ TestElement element = iterator.next();
+ iterator.remove();
+
+ boolean removesHead = element.equals(priorityQueue.peek());
+ if (removesHead) {
+ Assert.assertTrue(priorityQueue.remove(element));
+ } else {
+ priorityQueue.remove(element);
+ }
+ Assert.assertEquals(checkSet.size(), priorityQueue.size());
+
+ long lastPriorityValue = Long.MIN_VALUE;
--- End diff --
Could be
`long lastPriorityValue = removesHead ? element.getPriority() : Long.MIN_VALUE;`
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199816118
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param <T> the type of stored elements.
+ */
+public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
+
+ /** Serialized empty value to insert into RocksDB. */
+ private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
--- End diff --
👍
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199325112
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * Interface for collection that gives in order access to elements w.r.t their priority.
+ *
+ * @param <T> type of elements in the ordered set.
+ */
+@Internal
+public interface InternalPriorityQueue<T> {
+
+ /**
+ * Retrieves and removes the first element (w.r.t. the order) of this set,
+ * or returns {@code null} if this set is empty.
+ *
+ * @return the first element of this ordered set, or {@code null} if this set is empty.
+ */
+ @Nullable
+ T poll();
+
+ /**
+ * Retrieves, but does not remove, the element (w.r.t. order) of this set,
+ * or returns {@code null} if this set is empty.
+ *
+ * @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty.
+ */
+ @Nullable
+ T peek();
+
+ /**
+ * Adds the given element to the set, if it is not already contained.
+ *
+ * @param toAdd the element to add to the set.
+ * @return <code>true</> if the operation changed the head element or if is it unclear if the head element changed.
--- End diff --
nit: unclosed HTML tag `<code>`
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199814578
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
+ * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
+ * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
+ * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
+ * not on equals.
+ *
+ * <p>Possible future improvements:
+ * <ul>
+ * <li>We could also implement shrinking for the heap.</li>
+ * </ul>
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
+
+ /**
+ * The index of the head element in the array that represents the heap.
+ */
+ private static final int QUEUE_HEAD_INDEX = 1;
+
+ /**
+ * Comparator for the contained elements.
+ */
+ private final Comparator<T> elementComparator;
+
+ /**
+ * The array that represents the heap-organized priority queue.
+ */
+ private T[] queue;
+
+ /**
+ * The current size of the priority queue.
+ */
+ private int size;
+
+ /**
+ * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
+ *
+ * @param elementComparator comparator for the contained elements.
+ * @param minimumCapacity the minimum and initial capacity of this priority queue.
+ */
+ @SuppressWarnings("unchecked")
+ public HeapPriorityQueue(
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnegative int minimumCapacity) {
+
+ this.elementComparator = elementComparator;
+ this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
+ }
+
+ @Override
+ @Nullable
+ public T poll() {
+ return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
+ }
+
+ @Override
+ @Nullable
+ public T peek() {
+ return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
+ }
+
+ /**
+ * Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean add(@Nonnull T toAdd) {
+ return addInternal(toAdd);
+ }
+
+ /**
+ * This remove is based on object identity, not the result of equals.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean remove(@Nonnull T toStop) {
+ return removeInternal(toStop);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ @Override
+ @Nonnegative
+ public int size() {
+ return size;
+ }
+
+ public void clear() {
+ size = 0;
+ Arrays.fill(queue, null);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ @Nonnull
+ public <O> O[] toArray(O[] out) {
--- End diff --
It is aligned with the signature from `Collection::toArray(...)`
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199813529
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
+ * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
+ * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
+ * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
--- End diff --
👍
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199320146
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java ---
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
+ * two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth
+ * about the elements in this queue. The second storage is a (fast) ordered set cache, typically with some limited
+ * capacity. The cache is used to improve performance of accesses to the underlying store and contains contains an
+ * ordered (partial) view on the top elements in the ordered store. We are currently applying simple write-trough
--- End diff --
typo: `write-trough` -> `write-through`
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199566980
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
+ * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
+ * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
+ * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
--- End diff --
I would describe it: based on its index in the internal array, also in `remove` doc comment
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199325523
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param <T> the type of stored elements.
+ */
+public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
+
+ /** Serialized empty value to insert into RocksDB. */
+ private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+ /** The RocksDB instance that serves as store. */
+ @Nonnull
+ private final RocksDB db;
+
+ /** Handle to the column family of the RocksDB instance in which the elements are stored. */
+ @Nonnull
+ private final ColumnFamilyHandle columnFamilyHandle;
+
+ /** Read options for RocksDB. */
+ @Nonnull
+ private final ReadOptions readOptions;
+
+ /**
+ * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
+ * aligned with their logical order.
+ */
+ @Nonnull
+ private final TypeSerializer<T> byteOrderProducingSerializer;
+
+ /** Wrapper to batch all writes to RocksDB. */
+ @Nonnull
+ private final RocksDBWriteBatchWrapper batchWrapper;
+
+ /** The key-group id of all elements stored in this instance. */
+ @Nonnegative
+ private final int keyGroupId;
+
+ /** The key-group id in serialized form. */
+ @Nonnull
+ private final byte[] groupPrefixBytes;
+
+ /** Output stream that helps to serialize elements. */
+ @Nonnull
+ private final ByteArrayOutputStreamWithPos outputStream;
+
+ /** Output view that helps to serialize elements, must wrap the output stream. */
+ @Nonnull
+ private final DataOutputViewStreamWrapper outputView;
+
+ public RocksDBOrderedStore(
+ @Nonnegative int keyGroupId,
+ @Nonnull RocksDB db,
+ @Nonnull ColumnFamilyHandle columnFamilyHandle,
+ @Nonnull ReadOptions readOptions,
+ @Nonnull TypeSerializer<T> byteOrderProducingSerializer,
+ @Nonnull ByteArrayOutputStreamWithPos outputStream,
+ @Nonnull DataOutputViewStreamWrapper outputView,
+ @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+ this.db = db;
+ this.columnFamilyHandle = columnFamilyHandle;
+ this.readOptions = readOptions;
+ this.byteOrderProducingSerializer = byteOrderProducingSerializer;
+ this.outputStream = outputStream;
+ this.outputView = outputView;
+ this.keyGroupId = keyGroupId;
+ this.batchWrapper = batchWrapper;
+ this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
+ }
+
+ private byte[] createKeyGroupBytes(int keyGroupId) {
+
+ outputStream.reset();
+
+ try {
+ outputView.writeShort(keyGroupId);
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Could not write key-group bytes.", e);
+ }
+
+ return outputStream.toByteArray();
+ }
+
+ @Override
+ public void add(@Nonnull T element) {
+ byte[] elementBytes = serializeElement(element);
+ try {
+ batchWrapper.put(columnFamilyHandle, elementBytes, DUMMY_BYTES);
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException("Error while getting element from RocksDB.", e);
+ }
+ }
+
+ @Override
+ public void remove(@Nonnull T element) {
+ byte[] elementBytes = serializeElement(element);
+ try {
+ batchWrapper.remove(columnFamilyHandle, elementBytes);
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException("Error while removing element from RocksDB.", e);
+ }
+ }
+
+ /**
+ * This implementation comes at a relatively high cost per invocation. It should not be called repeatedly when it is
+ * clear that the value did not change. Currently this is only truly used to realize certain higher-level tests.
+ *
+ * @see org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore
+ */
+ @Override
+ public int size() {
+
+ int count = 0;
+ try (final RocksToJavaIteratorAdapter iterator = orderedIterator()) {
+ while (iterator.hasNext()) {
+ iterator.next();
+ ++count;
+ }
+ }
+
+ return count;
+ }
+
+ @Nonnull
+ @Override
+ public RocksToJavaIteratorAdapter orderedIterator() {
+
+ flushWriteBatch();
+
+ return new RocksToJavaIteratorAdapter(
+ new RocksIteratorWrapper(
+ db.newIterator(columnFamilyHandle, readOptions)));
+ }
+
+ /**
+ * Ensures that recent writes are flushed and reflect in the RocksDB instance.
+ */
+ private void flushWriteBatch() {
+ try {
+ batchWrapper.flush();
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
+ for (int i = 0; i < prefixBytes.length; ++i) {
+ if (bytes[i] != prefixBytes[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private byte[] serializeElement(T element) {
+ try {
+ outputStream.reset();
+ outputView.writeShort(keyGroupId);
--- End diff --
nit: Not sure whether we could use the `outputView.setPosition(2)` here?
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199567018
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
+ * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
+ * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
+ * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
+ * not on equals.
+ *
+ * <p>Possible future improvements:
+ * <ul>
+ * <li>We could also implement shrinking for the heap.</li>
+ * </ul>
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
+
+ /**
+ * The index of the head element in the array that represents the heap.
+ */
+ private static final int QUEUE_HEAD_INDEX = 1;
+
+ /**
+ * Comparator for the contained elements.
+ */
+ private final Comparator<T> elementComparator;
+
+ /**
+ * The array that represents the heap-organized priority queue.
+ */
+ private T[] queue;
+
+ /**
+ * The current size of the priority queue.
+ */
+ private int size;
+
+ /**
+ * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
+ *
+ * @param elementComparator comparator for the contained elements.
+ * @param minimumCapacity the minimum and initial capacity of this priority queue.
+ */
+ @SuppressWarnings("unchecked")
+ public HeapPriorityQueue(
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnegative int minimumCapacity) {
+
+ this.elementComparator = elementComparator;
+ this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
+ }
+
+ @Override
+ @Nullable
+ public T poll() {
+ return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
+ }
+
+ @Override
+ @Nullable
+ public T peek() {
+ return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
+ }
+
+ /**
+ * Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean add(@Nonnull T toAdd) {
+ return addInternal(toAdd);
+ }
+
+ /**
+ * This remove is based on object identity, not the result of equals.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean remove(@Nonnull T toStop) {
--- End diff --
Maybe rename method parameter to `toRemove`
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199320135
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java ---
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
+ * two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth
+ * about the elements in this queue. The second storage is a (fast) ordered set cache, typically with some limited
+ * capacity. The cache is used to improve performance of accesses to the underlying store and contains contains an
--- End diff --
duplicated `contains`
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199817513
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
+ @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+ @Nonnull Set<TestElement> checkSet,
+ int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority = localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated = priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ Assert.assertTrue(headChangedIndicated);
+ }
+ }
+ Assert.assertEquals(count, priorityQueue.size());
+ }
+
+ @Test
+ public void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ int lastSize = priorityQueue.size();
+ Assert.assertEquals(testSize, lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ Assert.assertFalse(priorityQueue.isEmpty());
+ Assert.assertEquals(lastSize, priorityQueue.size());
+ Assert.assertEquals(testElement, priorityQueue.poll());
+ Assert.assertTrue(checkSet.remove(testElement));
+ Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+ Assert.assertEquals(0, priorityQueue.size());
+ Assert.assertEquals(0, checkSet.size());
+ }
+
+ @Test
+ public void testStopInsertMixKeepsOrder() {
+
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ final int testSize = 128;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ // check that the whole set is still in order
+ while (!checkSet.isEmpty()) {
+
+ Iterator<TestElement> iterator = checkSet.iterator();
+ TestElement element = iterator.next();
+ iterator.remove();
+
+ boolean removesHead = element.equals(priorityQueue.peek());
+ if (removesHead) {
+ Assert.assertTrue(priorityQueue.remove(element));
+ } else {
+ priorityQueue.remove(element);
+ }
+ Assert.assertEquals(checkSet.size(), priorityQueue.size());
+
+ long lastPriorityValue = Long.MIN_VALUE;
+
+ while ((element = priorityQueue.poll()) != null) {
+ Assert.assertTrue(element.getPriority() >= lastPriorityValue);
+ lastPriorityValue = element.getPriority();
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ priorityQueue.addAll(checkSet);
+ }
+ }
+
+ @Test
+ public void testPoll() {
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ Assert.assertNull(priorityQueue.poll());
+
+ final int testSize = 345;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ while (!priorityQueue.isEmpty()) {
+ TestElement removed = priorityQueue.poll();
+ Assert.assertNotNull(removed);
+ Assert.assertTrue(checkSet.remove(removed));
+ Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
+ lastPriorityValue = removed.getPriority();
+ }
+ Assert.assertTrue(checkSet.isEmpty());
+
+ Assert.assertNull(priorityQueue.poll());
+ }
+
+ @Test
+ public void testIsEmpty() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
+ Assert.assertFalse(priorityQueue.isEmpty());
+
+ priorityQueue.poll();
+ Assert.assertTrue(priorityQueue.isEmpty());
+ }
+
+ @Test
+ public void testBulkAddRestoredTimers() throws Exception {
+ final int testSize = 10;
+ HashSet<TestElement> elementSet = new HashSet<>(testSize);
+ for (int i = 0; i < testSize; ++i) {
+ elementSet.add(new TestElement(i, i));
+ }
+
+ List<TestElement> twoTimesElementSet = new ArrayList<>(elementSet.size() * 2);
+
+ for (TestElement testElement : elementSet) {
+ twoTimesElementSet.add(testElement.deepCopy());
+ twoTimesElementSet.add(testElement.deepCopy());
+ }
+
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ priorityQueue.addAll(twoTimesElementSet);
+ priorityQueue.addAll(elementSet);
+
+ final int expectedSize = testSetSemantics() ? elementSet.size() : 3 * elementSet.size();
+
+ Assert.assertEquals(expectedSize, priorityQueue.size());
+ try (final CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ while (iterator.hasNext()) {
+ if (testSetSemantics()) {
+ Assert.assertTrue(elementSet.remove(iterator.next()));
+ } else {
+ Assert.assertTrue(elementSet.contains(iterator.next()));
+ }
+ }
+ }
+ if (testSetSemantics()) {
+ Assert.assertTrue(elementSet.isEmpty());
+ }
+ }
+
+ @Test
+ public void testIterator() throws Exception {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ // test empty iterator
+ try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ Assert.assertFalse(iterator.hasNext());
+ try {
+ iterator.next();
+ Assert.fail();
+ } catch (NoSuchElementException ignore) {
+ }
+ }
+
+ // iterate some data
+ final int testSize = 10;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+ try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ while (iterator.hasNext()) {
+ Assert.assertTrue(checkSet.remove(iterator.next()));
+ }
+ Assert.assertTrue(checkSet.isEmpty());
+ }
+ }
+
+ @Test
+ public void testAdd() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ TestElement lowPrioElement = new TestElement(4711L, 42L);
+ TestElement highPrioElement = new TestElement(815L, 23L);
+ Assert.assertTrue(priorityQueue.add(lowPrioElement));
+ if (testSetSemantics()) {
+ priorityQueue.add(lowPrioElement.deepCopy());
+ }
+ Assert.assertEquals(1, priorityQueue.size());
+ Assert.assertTrue(priorityQueue.add(highPrioElement));
+ Assert.assertEquals(2, priorityQueue.size());
+ Assert.assertEquals(highPrioElement, priorityQueue.poll());
+ Assert.assertEquals(1, priorityQueue.size());
+ Assert.assertEquals(lowPrioElement, priorityQueue.poll());
+ Assert.assertEquals(0, priorityQueue.size());
+ }
+
+ @Test
+ public void testRemove() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ final long key = 4711L;
+ final long priorityValue = 42L;
+ final TestElement testElement = new TestElement(key, priorityValue);
+ if (testSetSemantics()) {
+ Assert.assertFalse(priorityQueue.remove(testElement));
+ }
+ Assert.assertTrue(priorityQueue.add(testElement));
+ Assert.assertTrue(priorityQueue.remove(testElement));
+ if (testSetSemantics()) {
+ Assert.assertFalse(priorityQueue.remove(testElement));
+ }
+ Assert.assertTrue(priorityQueue.isEmpty());
+ }
+
+ protected abstract InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity);
+
+ protected abstract boolean testSetSemantics();
--- End diff --
👍
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199806792
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
--- End diff --
insertRandom**Elements**
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199807071
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
+ @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+ @Nonnull Set<TestElement> checkSet,
+ int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority = localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated = priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ Assert.assertTrue(headChangedIndicated);
+ }
+ }
+ Assert.assertEquals(count, priorityQueue.size());
+ }
+
+ @Test
+ public void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ int lastSize = priorityQueue.size();
+ Assert.assertEquals(testSize, lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ Assert.assertFalse(priorityQueue.isEmpty());
+ Assert.assertEquals(lastSize, priorityQueue.size());
+ Assert.assertEquals(testElement, priorityQueue.poll());
+ Assert.assertTrue(checkSet.remove(testElement));
+ Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+ Assert.assertEquals(0, priorityQueue.size());
+ Assert.assertEquals(0, checkSet.size());
+ }
+
+ @Test
+ public void testStopInsertMixKeepsOrder() {
+
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ final int testSize = 128;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ // check that the whole set is still in order
+ while (!checkSet.isEmpty()) {
+
+ Iterator<TestElement> iterator = checkSet.iterator();
+ TestElement element = iterator.next();
+ iterator.remove();
+
+ boolean removesHead = element.equals(priorityQueue.peek());
+ if (removesHead) {
+ Assert.assertTrue(priorityQueue.remove(element));
+ } else {
+ priorityQueue.remove(element);
+ }
+ Assert.assertEquals(checkSet.size(), priorityQueue.size());
+
+ long lastPriorityValue = Long.MIN_VALUE;
+
+ while ((element = priorityQueue.poll()) != null) {
+ Assert.assertTrue(element.getPriority() >= lastPriorityValue);
+ lastPriorityValue = element.getPriority();
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ priorityQueue.addAll(checkSet);
+ }
+ }
+
+ @Test
+ public void testPoll() {
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ Assert.assertNull(priorityQueue.poll());
+
+ final int testSize = 345;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ while (!priorityQueue.isEmpty()) {
+ TestElement removed = priorityQueue.poll();
+ Assert.assertNotNull(removed);
+ Assert.assertTrue(checkSet.remove(removed));
+ Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
+ lastPriorityValue = removed.getPriority();
+ }
+ Assert.assertTrue(checkSet.isEmpty());
+
+ Assert.assertNull(priorityQueue.poll());
+ }
+
+ @Test
+ public void testIsEmpty() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
+ Assert.assertFalse(priorityQueue.isEmpty());
+
+ priorityQueue.poll();
+ Assert.assertTrue(priorityQueue.isEmpty());
+ }
+
+ @Test
+ public void testBulkAddRestoredTimers() throws Exception {
+ final int testSize = 10;
+ HashSet<TestElement> elementSet = new HashSet<>(testSize);
+ for (int i = 0; i < testSize; ++i) {
+ elementSet.add(new TestElement(i, i));
+ }
+
+ List<TestElement> twoTimesElementSet = new ArrayList<>(elementSet.size() * 2);
+
+ for (TestElement testElement : elementSet) {
+ twoTimesElementSet.add(testElement.deepCopy());
+ twoTimesElementSet.add(testElement.deepCopy());
+ }
+
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ priorityQueue.addAll(twoTimesElementSet);
+ priorityQueue.addAll(elementSet);
+
+ final int expectedSize = testSetSemantics() ? elementSet.size() : 3 * elementSet.size();
+
+ Assert.assertEquals(expectedSize, priorityQueue.size());
+ try (final CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ while (iterator.hasNext()) {
+ if (testSetSemantics()) {
+ Assert.assertTrue(elementSet.remove(iterator.next()));
+ } else {
+ Assert.assertTrue(elementSet.contains(iterator.next()));
+ }
+ }
+ }
+ if (testSetSemantics()) {
+ Assert.assertTrue(elementSet.isEmpty());
+ }
+ }
+
+ @Test
+ public void testIterator() throws Exception {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ // test empty iterator
+ try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ Assert.assertFalse(iterator.hasNext());
+ try {
+ iterator.next();
+ Assert.fail();
+ } catch (NoSuchElementException ignore) {
+ }
+ }
+
+ // iterate some data
+ final int testSize = 10;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+ try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ while (iterator.hasNext()) {
+ Assert.assertTrue(checkSet.remove(iterator.next()));
+ }
+ Assert.assertTrue(checkSet.isEmpty());
+ }
+ }
+
+ @Test
+ public void testAdd() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ TestElement lowPrioElement = new TestElement(4711L, 42L);
+ TestElement highPrioElement = new TestElement(815L, 23L);
+ Assert.assertTrue(priorityQueue.add(lowPrioElement));
+ if (testSetSemantics()) {
+ priorityQueue.add(lowPrioElement.deepCopy());
+ }
+ Assert.assertEquals(1, priorityQueue.size());
+ Assert.assertTrue(priorityQueue.add(highPrioElement));
+ Assert.assertEquals(2, priorityQueue.size());
+ Assert.assertEquals(highPrioElement, priorityQueue.poll());
+ Assert.assertEquals(1, priorityQueue.size());
+ Assert.assertEquals(lowPrioElement, priorityQueue.poll());
+ Assert.assertEquals(0, priorityQueue.size());
+ }
+
+ @Test
+ public void testRemove() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ final long key = 4711L;
+ final long priorityValue = 42L;
+ final TestElement testElement = new TestElement(key, priorityValue);
+ if (testSetSemantics()) {
+ Assert.assertFalse(priorityQueue.remove(testElement));
+ }
+ Assert.assertTrue(priorityQueue.add(testElement));
+ Assert.assertTrue(priorityQueue.remove(testElement));
+ if (testSetSemantics()) {
+ Assert.assertFalse(priorityQueue.remove(testElement));
+ }
+ Assert.assertTrue(priorityQueue.isEmpty());
+ }
+
+ protected abstract InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity);
+
+ protected abstract boolean testSetSemantics();
+
+ /**
+ * Payload for usage in the test.
+ */
+ protected static class TestElement implements HeapPriorityQueueElement {
+
+ private final long key;
+ private final long priority;
+ private int internalIndex;
+
+ public TestElement(long key, long priority) {
+ this.key = key;
+ this.priority = priority;
+ this.internalIndex = NOT_CONTAINED;
+ }
+
+ public long getKey() {
+ return key;
+ }
+
+ public long getPriority() {
+ return priority;
+ }
+
+ @Override
+ public int getInternalIndex() {
+ return internalIndex;
+ }
+
+ @Override
+ public void setInternalIndex(int newIndex) {
+ internalIndex = newIndex;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestElement that = (TestElement) o;
+ return getKey() == that.getKey() &&
+ getPriority() == that.getPriority();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getKey(), getPriority());
+ }
+
+ public TestElement deepCopy() {
+ return new TestElement(key, priority);
+ }
+ }
+
+ /**
+ * Serializer for {@link TestElement}. The serialization format produced by this serializer allows lexicographic
+ * ordering by {@link TestElement#getPriority}.
+ */
+ protected static class TestElementSerializer extends TypeSerializer<TestElement> {
--- End diff --
extending `CompositeSerializer` from #6196 might reduce code a bit
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199841799
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
--- End diff --
good point 👍
---
[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/6228
@sihuazhou @azagrebin thanks for the reviews! I will merge this once my travis is green.
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199427083
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param <T> the type of stored elements.
+ */
+public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
+
+ /** Serialized empty value to insert into RocksDB. */
+ private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+ /** The RocksDB instance that serves as store. */
+ @Nonnull
+ private final RocksDB db;
+
+ /** Handle to the column family of the RocksDB instance in which the elements are stored. */
+ @Nonnull
+ private final ColumnFamilyHandle columnFamilyHandle;
+
+ /** Read options for RocksDB. */
+ @Nonnull
+ private final ReadOptions readOptions;
+
+ /**
+ * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
+ * aligned with their logical order.
+ */
+ @Nonnull
+ private final TypeSerializer<T> byteOrderProducingSerializer;
+
+ /** Wrapper to batch all writes to RocksDB. */
+ @Nonnull
+ private final RocksDBWriteBatchWrapper batchWrapper;
+
+ /** The key-group id of all elements stored in this instance. */
+ @Nonnegative
+ private final int keyGroupId;
+
+ /** The key-group id in serialized form. */
+ @Nonnull
+ private final byte[] groupPrefixBytes;
+
+ /** Output stream that helps to serialize elements. */
+ @Nonnull
+ private final ByteArrayOutputStreamWithPos outputStream;
+
+ /** Output view that helps to serialize elements, must wrap the output stream. */
+ @Nonnull
+ private final DataOutputViewStreamWrapper outputView;
+
+ public RocksDBOrderedStore(
+ @Nonnegative int keyGroupId,
+ @Nonnull RocksDB db,
+ @Nonnull ColumnFamilyHandle columnFamilyHandle,
+ @Nonnull ReadOptions readOptions,
+ @Nonnull TypeSerializer<T> byteOrderProducingSerializer,
+ @Nonnull ByteArrayOutputStreamWithPos outputStream,
+ @Nonnull DataOutputViewStreamWrapper outputView,
+ @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+ this.db = db;
+ this.columnFamilyHandle = columnFamilyHandle;
+ this.readOptions = readOptions;
+ this.byteOrderProducingSerializer = byteOrderProducingSerializer;
+ this.outputStream = outputStream;
+ this.outputView = outputView;
+ this.keyGroupId = keyGroupId;
+ this.batchWrapper = batchWrapper;
+ this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
+ }
+
+ private byte[] createKeyGroupBytes(int keyGroupId) {
+
+ outputStream.reset();
+
+ try {
+ outputView.writeShort(keyGroupId);
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Could not write key-group bytes.", e);
+ }
+
+ return outputStream.toByteArray();
+ }
+
+ @Override
+ public void add(@Nonnull T element) {
+ byte[] elementBytes = serializeElement(element);
+ try {
+ batchWrapper.put(columnFamilyHandle, elementBytes, DUMMY_BYTES);
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException("Error while getting element from RocksDB.", e);
+ }
+ }
+
+ @Override
+ public void remove(@Nonnull T element) {
+ byte[] elementBytes = serializeElement(element);
+ try {
+ batchWrapper.remove(columnFamilyHandle, elementBytes);
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException("Error while removing element from RocksDB.", e);
+ }
+ }
+
+ /**
+ * This implementation comes at a relatively high cost per invocation. It should not be called repeatedly when it is
+ * clear that the value did not change. Currently this is only truly used to realize certain higher-level tests.
+ *
+ * @see org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore
+ */
+ @Override
+ public int size() {
+
+ int count = 0;
+ try (final RocksToJavaIteratorAdapter iterator = orderedIterator()) {
+ while (iterator.hasNext()) {
+ iterator.next();
+ ++count;
+ }
+ }
+
+ return count;
+ }
+
+ @Nonnull
+ @Override
+ public RocksToJavaIteratorAdapter orderedIterator() {
+
+ flushWriteBatch();
+
+ return new RocksToJavaIteratorAdapter(
+ new RocksIteratorWrapper(
+ db.newIterator(columnFamilyHandle, readOptions)));
+ }
+
+ /**
+ * Ensures that recent writes are flushed and reflect in the RocksDB instance.
+ */
+ private void flushWriteBatch() {
+ try {
+ batchWrapper.flush();
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
+ for (int i = 0; i < prefixBytes.length; ++i) {
+ if (bytes[i] != prefixBytes[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private byte[] serializeElement(T element) {
+ try {
+ outputStream.reset();
+ outputView.writeShort(keyGroupId);
--- End diff --
It depends. If we do it like this, we can share the same outstream and view instances across all stores, i.e. all key-groups in the operator. Like this, we only have to allocate a single bigger `byte[]` as "write buffer" and not one per key-group. In the same way, I am later also considering to share the same write batch.
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199325117
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * Interface for collection that gives in order access to elements w.r.t their priority.
+ *
+ * @param <T> type of elements in the ordered set.
+ */
+@Internal
+public interface InternalPriorityQueue<T> {
+
+ /**
+ * Retrieves and removes the first element (w.r.t. the order) of this set,
+ * or returns {@code null} if this set is empty.
+ *
+ * @return the first element of this ordered set, or {@code null} if this set is empty.
+ */
+ @Nullable
+ T poll();
+
+ /**
+ * Retrieves, but does not remove, the element (w.r.t. order) of this set,
+ * or returns {@code null} if this set is empty.
+ *
+ * @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty.
+ */
+ @Nullable
+ T peek();
+
+ /**
+ * Adds the given element to the set, if it is not already contained.
+ *
+ * @param toAdd the element to add to the set.
+ * @return <code>true</> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</> iff the head element was not changed by this operation.
+ */
+ boolean add(@Nonnull T toAdd);
+
+ /**
+ * Removes the given element from the set, if is contained in the set.
+ *
+ * @param toRemove the element to remove.
+ * @return <code>true</> if the operation changed the head element or if is it unclear if the head element changed.
--- End diff --
nit: unclosed HTML tag `<code>`
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199320321
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param <T> the type of stored elements.
+ */
+public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
+
+ /** Serialized empty value to insert into RocksDB. */
+ private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+ /** The RocksDB instance that serves as store. */
+ @Nonnull
+ private final RocksDB db;
+
+ /** Handle to the column family of the RocksDB instance in which the elements are stored. */
+ @Nonnull
+ private final ColumnFamilyHandle columnFamilyHandle;
+
+ /** Read options for RocksDB. */
+ @Nonnull
+ private final ReadOptions readOptions;
+
+ /**
+ * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
+ * aligned with their logical order.
+ */
+ @Nonnull
+ private final TypeSerializer<T> byteOrderProducingSerializer;
+
+ /** Wrapper to batch all writes to RocksDB. */
+ @Nonnull
+ private final RocksDBWriteBatchWrapper batchWrapper;
+
+ /** The key-group id of all elements stored in this instance. */
+ @Nonnegative
+ private final int keyGroupId;
+
+ /** The key-group id in serialized form. */
+ @Nonnull
+ private final byte[] groupPrefixBytes;
+
+ /** Output stream that helps to serialize elements. */
+ @Nonnull
+ private final ByteArrayOutputStreamWithPos outputStream;
+
+ /** Output view that helps to serialize elements, must wrap the output stream. */
+ @Nonnull
+ private final DataOutputViewStreamWrapper outputView;
+
+ public RocksDBOrderedStore(
+ @Nonnegative int keyGroupId,
+ @Nonnull RocksDB db,
+ @Nonnull ColumnFamilyHandle columnFamilyHandle,
+ @Nonnull ReadOptions readOptions,
+ @Nonnull TypeSerializer<T> byteOrderProducingSerializer,
+ @Nonnull ByteArrayOutputStreamWithPos outputStream,
+ @Nonnull DataOutputViewStreamWrapper outputView,
+ @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+ this.db = db;
+ this.columnFamilyHandle = columnFamilyHandle;
+ this.readOptions = readOptions;
+ this.byteOrderProducingSerializer = byteOrderProducingSerializer;
+ this.outputStream = outputStream;
+ this.outputView = outputView;
+ this.keyGroupId = keyGroupId;
+ this.batchWrapper = batchWrapper;
+ this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
+ }
+
+ private byte[] createKeyGroupBytes(int keyGroupId) {
--- End diff --
In the `RocksDBKeyedStateBackend` we firstly compute the `keyGroupPrefixBytes`(1 or 2 bytes), for what reason we always use 2 bytes for the key group here?
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199442186
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param <T> the type of stored elements.
+ */
+public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
+
+ /** Serialized empty value to insert into RocksDB. */
+ private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+ /** The RocksDB instance that serves as store. */
+ @Nonnull
+ private final RocksDB db;
+
+ /** Handle to the column family of the RocksDB instance in which the elements are stored. */
+ @Nonnull
+ private final ColumnFamilyHandle columnFamilyHandle;
+
+ /** Read options for RocksDB. */
+ @Nonnull
+ private final ReadOptions readOptions;
+
+ /**
+ * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
+ * aligned with their logical order.
+ */
+ @Nonnull
+ private final TypeSerializer<T> byteOrderProducingSerializer;
+
+ /** Wrapper to batch all writes to RocksDB. */
+ @Nonnull
+ private final RocksDBWriteBatchWrapper batchWrapper;
+
+ /** The key-group id of all elements stored in this instance. */
+ @Nonnegative
+ private final int keyGroupId;
+
+ /** The key-group id in serialized form. */
+ @Nonnull
+ private final byte[] groupPrefixBytes;
+
+ /** Output stream that helps to serialize elements. */
+ @Nonnull
+ private final ByteArrayOutputStreamWithPos outputStream;
+
+ /** Output view that helps to serialize elements, must wrap the output stream. */
+ @Nonnull
+ private final DataOutputViewStreamWrapper outputView;
+
+ public RocksDBOrderedStore(
+ @Nonnegative int keyGroupId,
+ @Nonnull RocksDB db,
+ @Nonnull ColumnFamilyHandle columnFamilyHandle,
+ @Nonnull ReadOptions readOptions,
+ @Nonnull TypeSerializer<T> byteOrderProducingSerializer,
+ @Nonnull ByteArrayOutputStreamWithPos outputStream,
+ @Nonnull DataOutputViewStreamWrapper outputView,
+ @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+ this.db = db;
+ this.columnFamilyHandle = columnFamilyHandle;
+ this.readOptions = readOptions;
+ this.byteOrderProducingSerializer = byteOrderProducingSerializer;
+ this.outputStream = outputStream;
+ this.outputView = outputView;
+ this.keyGroupId = keyGroupId;
+ this.batchWrapper = batchWrapper;
+ this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
+ }
+
+ private byte[] createKeyGroupBytes(int keyGroupId) {
--- End diff --
I did it for simplicity, but you are right. Will change it to variable size.
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199806862
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
+ @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+ @Nonnull Set<TestElement> checkSet,
+ int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority = localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated = priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ Assert.assertTrue(headChangedIndicated);
+ }
+ }
+ Assert.assertEquals(count, priorityQueue.size());
+ }
+
+ @Test
+ public void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ int lastSize = priorityQueue.size();
+ Assert.assertEquals(testSize, lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ Assert.assertFalse(priorityQueue.isEmpty());
+ Assert.assertEquals(lastSize, priorityQueue.size());
+ Assert.assertEquals(testElement, priorityQueue.poll());
+ Assert.assertTrue(checkSet.remove(testElement));
+ Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+ Assert.assertEquals(0, priorityQueue.size());
+ Assert.assertEquals(0, checkSet.size());
+ }
+
+ @Test
+ public void testStopInsertMixKeepsOrder() {
--- End diff --
test**Remove**InsertMixKeepsOrder
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199567286
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+/**
+ * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
+ * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
+ * semantics.
+ *
+ * @param <T> the type of elements in the queue.
+ * @param <PQ> type type of sub-queue used for each key-group partition.
+ */
+public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+ implements InternalPriorityQueue<T> {
+
+ /** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
+ @Nonnull
+ private final HeapPriorityQueue<PQ> keyGroupHeap;
+
+ /** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
+ @Nonnull
+ private final PQ[] keyGroupLists;
+
+ /** Function to extract the key from contained elements. */
+ @Nonnull
+ private final KeyExtractorFunction<T> keyExtractor;
+
+ /** The total number of key-groups (in the job). */
+ @Nonnegative
+ private final int totalKeyGroups;
+
+ /** The smallest key-group id with a subpartition managed by this ordered set. */
+ @Nonnegative
+ private final int firstKeyGroup;
+
+ @SuppressWarnings("unchecked")
+ public KeyGroupPartitionedPriorityQueue(
+ @Nonnull KeyExtractorFunction<T> keyExtractor,
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
+ @Nonnull KeyGroupRange keyGroupRange,
+ @Nonnegative int totalKeyGroups) {
+
+ this.keyExtractor = keyExtractor;
+ this.totalKeyGroups = totalKeyGroups;
+ this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
+ this.keyGroupLists = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
+ this.keyGroupHeap = new HeapPriorityQueue<>(
+ new InternalPriorityQueueComparator<>(elementComparator),
+ keyGroupRange.getNumberOfKeyGroups());
+ for (int i = 0; i < keyGroupLists.length; i++) {
+ final PQ keyGroupCache =
+ orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementComparator);
+ keyGroupLists[i] = keyGroupCache;
+ keyGroupHeap.add(keyGroupCache);
+ }
+ }
+
+ @Nullable
+ @Override
+ public T poll() {
+ final PQ headList = keyGroupHeap.peek();
+ final T head = headList.poll();
+ keyGroupHeap.adjustModifiedElement(headList);
+ return head;
+ }
+
+ @Nullable
+ @Override
+ public T peek() {
+ return keyGroupHeap.peek().peek();
+ }
+
+ @Override
+ public boolean add(@Nonnull T toAdd) {
+ final PQ list = getListForElementKeyGroup(toAdd);
+
+ // the branch checks if the head element has (potentially) changed.
+ if (list.add(toAdd)) {
+ keyGroupHeap.adjustModifiedElement(list);
+ // could we have a new head?
+ return toAdd.equals(peek());
+ } else {
+ // head unchanged
+ return false;
+ }
+ }
+
+ @Override
+ public boolean remove(@Nonnull T toRemove) {
+ final PQ list = getListForElementKeyGroup(toRemove);
+
+ final T oldHead = peek();
+
+ // the branch checks if the head element has (potentially) changed.
+ if (list.remove(toRemove)) {
+ keyGroupHeap.adjustModifiedElement(list);
+ // could we have a new head?
+ return toRemove.equals(oldHead);
+ } else {
+ // head unchanged
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return peek() == null;
+ }
+
+ @Override
+ public int size() {
+ int sizeSum = 0;
+ for (PQ list : keyGroupLists) {
+ sizeSum += list.size();
+ }
+ return sizeSum;
+ }
+
+ @Override
+ public void addAll(@Nullable Collection<? extends T> toAdd) {
+
+ if (toAdd == null) {
+ return;
+ }
+
+ // TODO consider bulk loading the partitions and "heapify" keyGroupHeap once after all elements are inserted.
+ for (T element : toAdd) {
+ add(element);
+ }
+ }
+
+ @Nonnull
+ @Override
+ public CloseableIterator<T> iterator() {
+ return new KeyGroupConcatenationIterator<>(keyGroupLists);
+ }
+
+ private PQ getListForElementKeyGroup(T element) {
+ return keyGroupLists[computeKeyGroupIndex(element)];
+ }
+
+ private int computeKeyGroupIndex(T element) {
+ final Object extractKeyFromElement = keyExtractor.extractKeyFromElement(element);
+ final int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement, totalKeyGroups);
+ return keyGroupId - firstKeyGroup;
+ }
+
+ /**
+ * Iterator for {@link KeyGroupPartitionedPriorityQueue}. This iterator is not guaranteeing any order of elements.
+ * Using code must {@link #close()} after usage.
+ *
+ * @param <T> the type of iterated elements.
+ */
+ private static final class KeyGroupConcatenationIterator<
+ T, PQS extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+ implements CloseableIterator<T> {
+
+ /** Array with the subpartitions that we iterate. No null values in the array. */
+ @Nonnull
+ private final PQS[] keyGroupLists;
+
+ /** The subpartition the is currently iterated. */
+ @Nonnegative
+ private int index;
+
+ /** The iterator of the current subpartition. */
+ @Nonnull
+ private CloseableIterator<T> current;
+
+ private KeyGroupConcatenationIterator(@Nonnull PQS[] keyGroupLists) {
+ this.keyGroupLists = keyGroupLists;
+ this.index = 0;
+ this.current = CloseableIterator.empty();
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean currentHasNext = current.hasNext();
+
+ // find the iterator of the next partition that has elements.
+ while (!currentHasNext && index < keyGroupLists.length) {
+ IOUtils.closeQuietly(current);
+ current = keyGroupLists[index++].iterator();
+ currentHasNext = current.hasNext();
+ }
+ return currentHasNext;
+ }
+
+ @Override
+ public T next() {
+ return current.next();
+ }
+
+ @Override
+ public void close() throws Exception {
+ current.close();
+ }
+ }
+
+ /**
+ * Comparator that compares {@link InternalPriorityQueue} objects by their head element. Must handle null results
+ * from {@link #peek()}.
+ *
+ * @param <T> type of the elements in the compared queues.
+ * @param <Q> type of queue.
+ */
+ private static final class InternalPriorityQueueComparator<T, Q extends InternalPriorityQueue<T>>
+ implements Comparator<Q> {
+
+ /** Comparator for the queue elements, so we can compare their heads. */
+ @Nonnull
+ private final Comparator<T> elementComparator;
+
+ InternalPriorityQueueComparator(@Nonnull Comparator<T> elementComparator) {
+ this.elementComparator = elementComparator;
+ }
+
+ @Override
+ public int compare(Q o1, Q o2) {
+ final T leftTimer = o1.peek();
+ final T rightTimer = o2.peek();
--- End diff --
`xTimer` -> `xElement`, probably left over from timer heap
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199815863
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+/**
+ * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
+ * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
+ * semantics.
+ *
+ * @param <T> the type of elements in the queue.
+ * @param <PQ> type type of sub-queue used for each key-group partition.
+ */
+public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+ implements InternalPriorityQueue<T> {
+
+ /** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
+ @Nonnull
+ private final HeapPriorityQueue<PQ> keyGroupHeap;
+
+ /** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
+ @Nonnull
+ private final PQ[] keyGroupLists;
--- End diff --
👍
---
[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/6228
CC @azagrebin @sihuazhou
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199812074
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * Interface for collection that gives in order access to elements w.r.t their priority.
+ *
+ * @param <T> type of elements in the ordered set.
+ */
+@Internal
+public interface InternalPriorityQueue<T> {
+
+ /**
+ * Retrieves and removes the first element (w.r.t. the order) of this set,
+ * or returns {@code null} if this set is empty.
+ *
+ * @return the first element of this ordered set, or {@code null} if this set is empty.
+ */
+ @Nullable
+ T poll();
+
+ /**
+ * Retrieves, but does not remove, the element (w.r.t. order) of this set,
+ * or returns {@code null} if this set is empty.
+ *
+ * @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty.
+ */
+ @Nullable
+ T peek();
+
+ /**
+ * Adds the given element to the set, if it is not already contained.
+ *
+ * @param toAdd the element to add to the set.
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ boolean add(@Nonnull T toAdd);
+
+ /**
+ * Removes the given element from the set, if is contained in the set.
+ *
+ * @param toRemove the element to remove.
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
--- End diff --
👍
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199567420
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param <T> the type of stored elements.
+ */
+public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
+
+ /** Serialized empty value to insert into RocksDB. */
+ private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
--- End diff --
Empty value might take less space: `new byte[] {}` or if not supported by RocksDB, maybe easier `new byte[] { 0 }`
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199816750
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
+ @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+ @Nonnull Set<TestElement> checkSet,
+ int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority = localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated = priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ Assert.assertTrue(headChangedIndicated);
+ }
+ }
+ Assert.assertEquals(count, priorityQueue.size());
+ }
+
+ @Test
+ public void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ int lastSize = priorityQueue.size();
+ Assert.assertEquals(testSize, lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ Assert.assertFalse(priorityQueue.isEmpty());
+ Assert.assertEquals(lastSize, priorityQueue.size());
+ Assert.assertEquals(testElement, priorityQueue.poll());
+ Assert.assertTrue(checkSet.remove(testElement));
+ Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+ Assert.assertEquals(0, priorityQueue.size());
+ Assert.assertEquals(0, checkSet.size());
+ }
+
+ @Test
+ public void testStopInsertMixKeepsOrder() {
--- End diff --
👍
---
[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/6228
@sihuazhou thanks for the fast review. I addressed all your comments.
---
[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6228
+1 from my side
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199426052
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param <T> the type of stored elements.
+ */
+public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
+
+ /** Serialized empty value to insert into RocksDB. */
+ private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+ /** The RocksDB instance that serves as store. */
+ @Nonnull
+ private final RocksDB db;
+
+ /** Handle to the column family of the RocksDB instance in which the elements are stored. */
+ @Nonnull
+ private final ColumnFamilyHandle columnFamilyHandle;
+
+ /** Read options for RocksDB. */
+ @Nonnull
+ private final ReadOptions readOptions;
+
+ /**
+ * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
+ * aligned with their logical order.
+ */
+ @Nonnull
+ private final TypeSerializer<T> byteOrderProducingSerializer;
+
+ /** Wrapper to batch all writes to RocksDB. */
+ @Nonnull
+ private final RocksDBWriteBatchWrapper batchWrapper;
+
+ /** The key-group id of all elements stored in this instance. */
+ @Nonnegative
+ private final int keyGroupId;
+
+ /** The key-group id in serialized form. */
+ @Nonnull
+ private final byte[] groupPrefixBytes;
+
+ /** Output stream that helps to serialize elements. */
+ @Nonnull
+ private final ByteArrayOutputStreamWithPos outputStream;
+
+ /** Output view that helps to serialize elements, must wrap the output stream. */
+ @Nonnull
+ private final DataOutputViewStreamWrapper outputView;
+
+ public RocksDBOrderedStore(
+ @Nonnegative int keyGroupId,
+ @Nonnull RocksDB db,
+ @Nonnull ColumnFamilyHandle columnFamilyHandle,
+ @Nonnull ReadOptions readOptions,
+ @Nonnull TypeSerializer<T> byteOrderProducingSerializer,
+ @Nonnull ByteArrayOutputStreamWithPos outputStream,
+ @Nonnull DataOutputViewStreamWrapper outputView,
+ @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+ this.db = db;
+ this.columnFamilyHandle = columnFamilyHandle;
+ this.readOptions = readOptions;
+ this.byteOrderProducingSerializer = byteOrderProducingSerializer;
+ this.outputStream = outputStream;
+ this.outputView = outputView;
+ this.keyGroupId = keyGroupId;
+ this.batchWrapper = batchWrapper;
+ this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
+ }
+
+ private byte[] createKeyGroupBytes(int keyGroupId) {
+
+ outputStream.reset();
+
+ try {
+ outputView.writeShort(keyGroupId);
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Could not write key-group bytes.", e);
+ }
+
+ return outputStream.toByteArray();
+ }
+
+ @Override
+ public void add(@Nonnull T element) {
+ byte[] elementBytes = serializeElement(element);
+ try {
+ batchWrapper.put(columnFamilyHandle, elementBytes, DUMMY_BYTES);
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException("Error while getting element from RocksDB.", e);
+ }
+ }
+
+ @Override
+ public void remove(@Nonnull T element) {
+ byte[] elementBytes = serializeElement(element);
+ try {
+ batchWrapper.remove(columnFamilyHandle, elementBytes);
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException("Error while removing element from RocksDB.", e);
+ }
+ }
+
+ /**
+ * This implementation comes at a relatively high cost per invocation. It should not be called repeatedly when it is
+ * clear that the value did not change. Currently this is only truly used to realize certain higher-level tests.
+ *
+ * @see org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore
+ */
+ @Override
+ public int size() {
+
+ int count = 0;
+ try (final RocksToJavaIteratorAdapter iterator = orderedIterator()) {
+ while (iterator.hasNext()) {
+ iterator.next();
+ ++count;
+ }
+ }
+
+ return count;
+ }
+
+ @Nonnull
+ @Override
+ public RocksToJavaIteratorAdapter orderedIterator() {
+
+ flushWriteBatch();
+
+ return new RocksToJavaIteratorAdapter(
+ new RocksIteratorWrapper(
+ db.newIterator(columnFamilyHandle, readOptions)));
+ }
+
+ /**
+ * Ensures that recent writes are flushed and reflect in the RocksDB instance.
+ */
+ private void flushWriteBatch() {
+ try {
+ batchWrapper.flush();
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
+ for (int i = 0; i < prefixBytes.length; ++i) {
+ if (bytes[i] != prefixBytes[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private byte[] serializeElement(T element) {
+ try {
+ outputStream.reset();
+ outputView.writeShort(keyGroupId);
+ byteOrderProducingSerializer.serialize(element, outputView);
+ return outputStream.toByteArray();
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Error while serializing the element.", e);
+ }
+ }
+
+ private T deserializeElement(byte[] bytes) {
+ try {
+ // TODO introduce a stream in which we can change the internal byte[] to avoid creating instances per call
+ ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(bytes);
+ DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
+ inputView.readShort();
--- End diff --
`skip()` also has a return value that should be checked (in general, maybe not for this particular stream that we use), similar to what `skipBytes(...)`. I think still doing the check is cleaner and then I think we can also use `readShort`, and it is easier to identify as the counterpart of the corresponding `writeShort(...)`.
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199566880
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Function to extract a key from a given object.
+ *
+ * @param <T> type of the element from which we extract the key.
+ */
+@FunctionalInterface
+public interface KeyExtractorFunction<T> {
--- End diff --
Is there a particular reason to create a named interface? It could be just Function<T, Object>.
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199567240
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+/**
+ * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
+ * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
+ * semantics.
+ *
+ * @param <T> the type of elements in the queue.
+ * @param <PQ> type type of sub-queue used for each key-group partition.
+ */
+public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
+ implements InternalPriorityQueue<T> {
+
+ /** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
+ @Nonnull
+ private final HeapPriorityQueue<PQ> keyGroupHeap;
+
+ /** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
+ @Nonnull
+ private final PQ[] keyGroupLists;
--- End diff --
I would suggest to rename it to `KeyGroupQueueSets` along with `getListForElementKeyGroup` and `keyGroupCache`. Probably some left-over
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199325190
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param <T> the type of stored elements.
+ */
+public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
--- End diff --
Does `RocksDBOrderedStore` mean `RocksDBOrderedSetStore`?
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199320254
--- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param <T> the type of stored elements.
+ */
+public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
+
+ /** Serialized empty value to insert into RocksDB. */
+ private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+ /** The RocksDB instance that serves as store. */
+ @Nonnull
+ private final RocksDB db;
+
+ /** Handle to the column family of the RocksDB instance in which the elements are stored. */
+ @Nonnull
+ private final ColumnFamilyHandle columnFamilyHandle;
+
+ /** Read options for RocksDB. */
+ @Nonnull
+ private final ReadOptions readOptions;
+
+ /**
+ * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
+ * aligned with their logical order.
+ */
+ @Nonnull
+ private final TypeSerializer<T> byteOrderProducingSerializer;
+
+ /** Wrapper to batch all writes to RocksDB. */
+ @Nonnull
+ private final RocksDBWriteBatchWrapper batchWrapper;
+
+ /** The key-group id of all elements stored in this instance. */
+ @Nonnegative
+ private final int keyGroupId;
+
+ /** The key-group id in serialized form. */
+ @Nonnull
+ private final byte[] groupPrefixBytes;
+
+ /** Output stream that helps to serialize elements. */
+ @Nonnull
+ private final ByteArrayOutputStreamWithPos outputStream;
+
+ /** Output view that helps to serialize elements, must wrap the output stream. */
+ @Nonnull
+ private final DataOutputViewStreamWrapper outputView;
+
+ public RocksDBOrderedStore(
+ @Nonnegative int keyGroupId,
+ @Nonnull RocksDB db,
+ @Nonnull ColumnFamilyHandle columnFamilyHandle,
+ @Nonnull ReadOptions readOptions,
+ @Nonnull TypeSerializer<T> byteOrderProducingSerializer,
+ @Nonnull ByteArrayOutputStreamWithPos outputStream,
+ @Nonnull DataOutputViewStreamWrapper outputView,
+ @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+ this.db = db;
+ this.columnFamilyHandle = columnFamilyHandle;
+ this.readOptions = readOptions;
+ this.byteOrderProducingSerializer = byteOrderProducingSerializer;
+ this.outputStream = outputStream;
+ this.outputView = outputView;
+ this.keyGroupId = keyGroupId;
+ this.batchWrapper = batchWrapper;
+ this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
+ }
+
+ private byte[] createKeyGroupBytes(int keyGroupId) {
+
+ outputStream.reset();
+
+ try {
+ outputView.writeShort(keyGroupId);
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Could not write key-group bytes.", e);
+ }
+
+ return outputStream.toByteArray();
+ }
+
+ @Override
+ public void add(@Nonnull T element) {
+ byte[] elementBytes = serializeElement(element);
+ try {
+ batchWrapper.put(columnFamilyHandle, elementBytes, DUMMY_BYTES);
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException("Error while getting element from RocksDB.", e);
+ }
+ }
+
+ @Override
+ public void remove(@Nonnull T element) {
+ byte[] elementBytes = serializeElement(element);
+ try {
+ batchWrapper.remove(columnFamilyHandle, elementBytes);
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException("Error while removing element from RocksDB.", e);
+ }
+ }
+
+ /**
+ * This implementation comes at a relatively high cost per invocation. It should not be called repeatedly when it is
+ * clear that the value did not change. Currently this is only truly used to realize certain higher-level tests.
+ *
+ * @see org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore
+ */
+ @Override
+ public int size() {
+
+ int count = 0;
+ try (final RocksToJavaIteratorAdapter iterator = orderedIterator()) {
+ while (iterator.hasNext()) {
+ iterator.next();
+ ++count;
+ }
+ }
+
+ return count;
+ }
+
+ @Nonnull
+ @Override
+ public RocksToJavaIteratorAdapter orderedIterator() {
+
+ flushWriteBatch();
+
+ return new RocksToJavaIteratorAdapter(
+ new RocksIteratorWrapper(
+ db.newIterator(columnFamilyHandle, readOptions)));
+ }
+
+ /**
+ * Ensures that recent writes are flushed and reflect in the RocksDB instance.
+ */
+ private void flushWriteBatch() {
+ try {
+ batchWrapper.flush();
+ } catch (RocksDBException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
+ for (int i = 0; i < prefixBytes.length; ++i) {
+ if (bytes[i] != prefixBytes[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private byte[] serializeElement(T element) {
+ try {
+ outputStream.reset();
+ outputView.writeShort(keyGroupId);
+ byteOrderProducingSerializer.serialize(element, outputView);
+ return outputStream.toByteArray();
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Error while serializing the element.", e);
+ }
+ }
+
+ private T deserializeElement(byte[] bytes) {
+ try {
+ // TODO introduce a stream in which we can change the internal byte[] to avoid creating instances per call
+ ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(bytes);
+ DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
+ inputView.readShort();
--- End diff --
Maybe we could use `inputView.skip(2)` here.
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199806997
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
+ @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+ @Nonnull Set<TestElement> checkSet,
+ int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority = localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated = priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ Assert.assertTrue(headChangedIndicated);
+ }
+ }
+ Assert.assertEquals(count, priorityQueue.size());
+ }
+
+ @Test
+ public void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ int lastSize = priorityQueue.size();
+ Assert.assertEquals(testSize, lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ Assert.assertFalse(priorityQueue.isEmpty());
+ Assert.assertEquals(lastSize, priorityQueue.size());
+ Assert.assertEquals(testElement, priorityQueue.poll());
+ Assert.assertTrue(checkSet.remove(testElement));
+ Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+ Assert.assertEquals(0, priorityQueue.size());
+ Assert.assertEquals(0, checkSet.size());
+ }
+
+ @Test
+ public void testStopInsertMixKeepsOrder() {
+
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ final int testSize = 128;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ // check that the whole set is still in order
+ while (!checkSet.isEmpty()) {
+
+ Iterator<TestElement> iterator = checkSet.iterator();
+ TestElement element = iterator.next();
+ iterator.remove();
+
+ boolean removesHead = element.equals(priorityQueue.peek());
+ if (removesHead) {
+ Assert.assertTrue(priorityQueue.remove(element));
+ } else {
+ priorityQueue.remove(element);
+ }
+ Assert.assertEquals(checkSet.size(), priorityQueue.size());
+
+ long lastPriorityValue = Long.MIN_VALUE;
+
+ while ((element = priorityQueue.poll()) != null) {
+ Assert.assertTrue(element.getPriority() >= lastPriorityValue);
+ lastPriorityValue = element.getPriority();
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ priorityQueue.addAll(checkSet);
+ }
+ }
+
+ @Test
+ public void testPoll() {
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ Assert.assertNull(priorityQueue.poll());
+
+ final int testSize = 345;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ while (!priorityQueue.isEmpty()) {
+ TestElement removed = priorityQueue.poll();
+ Assert.assertNotNull(removed);
+ Assert.assertTrue(checkSet.remove(removed));
+ Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
+ lastPriorityValue = removed.getPriority();
+ }
+ Assert.assertTrue(checkSet.isEmpty());
+
+ Assert.assertNull(priorityQueue.poll());
+ }
+
+ @Test
+ public void testIsEmpty() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
+ Assert.assertFalse(priorityQueue.isEmpty());
+
+ priorityQueue.poll();
+ Assert.assertTrue(priorityQueue.isEmpty());
+ }
+
+ @Test
+ public void testBulkAddRestoredTimers() throws Exception {
+ final int testSize = 10;
+ HashSet<TestElement> elementSet = new HashSet<>(testSize);
+ for (int i = 0; i < testSize; ++i) {
+ elementSet.add(new TestElement(i, i));
+ }
+
+ List<TestElement> twoTimesElementSet = new ArrayList<>(elementSet.size() * 2);
+
+ for (TestElement testElement : elementSet) {
+ twoTimesElementSet.add(testElement.deepCopy());
+ twoTimesElementSet.add(testElement.deepCopy());
+ }
+
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ priorityQueue.addAll(twoTimesElementSet);
+ priorityQueue.addAll(elementSet);
+
+ final int expectedSize = testSetSemantics() ? elementSet.size() : 3 * elementSet.size();
+
+ Assert.assertEquals(expectedSize, priorityQueue.size());
+ try (final CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ while (iterator.hasNext()) {
+ if (testSetSemantics()) {
+ Assert.assertTrue(elementSet.remove(iterator.next()));
+ } else {
+ Assert.assertTrue(elementSet.contains(iterator.next()));
+ }
+ }
+ }
+ if (testSetSemantics()) {
+ Assert.assertTrue(elementSet.isEmpty());
+ }
+ }
+
+ @Test
+ public void testIterator() throws Exception {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ // test empty iterator
+ try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ Assert.assertFalse(iterator.hasNext());
+ try {
+ iterator.next();
+ Assert.fail();
+ } catch (NoSuchElementException ignore) {
+ }
+ }
+
+ // iterate some data
+ final int testSize = 10;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+ try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
+ while (iterator.hasNext()) {
+ Assert.assertTrue(checkSet.remove(iterator.next()));
+ }
+ Assert.assertTrue(checkSet.isEmpty());
+ }
+ }
+
+ @Test
+ public void testAdd() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ TestElement lowPrioElement = new TestElement(4711L, 42L);
+ TestElement highPrioElement = new TestElement(815L, 23L);
+ Assert.assertTrue(priorityQueue.add(lowPrioElement));
+ if (testSetSemantics()) {
+ priorityQueue.add(lowPrioElement.deepCopy());
+ }
+ Assert.assertEquals(1, priorityQueue.size());
+ Assert.assertTrue(priorityQueue.add(highPrioElement));
+ Assert.assertEquals(2, priorityQueue.size());
+ Assert.assertEquals(highPrioElement, priorityQueue.poll());
+ Assert.assertEquals(1, priorityQueue.size());
+ Assert.assertEquals(lowPrioElement, priorityQueue.poll());
+ Assert.assertEquals(0, priorityQueue.size());
+ }
+
+ @Test
+ public void testRemove() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ final long key = 4711L;
+ final long priorityValue = 42L;
+ final TestElement testElement = new TestElement(key, priorityValue);
+ if (testSetSemantics()) {
+ Assert.assertFalse(priorityQueue.remove(testElement));
+ }
+ Assert.assertTrue(priorityQueue.add(testElement));
+ Assert.assertTrue(priorityQueue.remove(testElement));
+ if (testSetSemantics()) {
+ Assert.assertFalse(priorityQueue.remove(testElement));
+ }
+ Assert.assertTrue(priorityQueue.isEmpty());
+ }
+
+ protected abstract InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity);
+
+ protected abstract boolean testSetSemantics();
--- End diff --
maybe more readable `testSetSemanticsUniqueElements` or something
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199817158
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
+ @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+ @Nonnull Set<TestElement> checkSet,
+ int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority = localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated = priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ Assert.assertTrue(headChangedIndicated);
+ }
+ }
+ Assert.assertEquals(count, priorityQueue.size());
+ }
+
+ @Test
+ public void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ int lastSize = priorityQueue.size();
+ Assert.assertEquals(testSize, lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ Assert.assertFalse(priorityQueue.isEmpty());
+ Assert.assertEquals(lastSize, priorityQueue.size());
+ Assert.assertEquals(testElement, priorityQueue.poll());
+ Assert.assertTrue(checkSet.remove(testElement));
+ Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+ Assert.assertEquals(0, priorityQueue.size());
+ Assert.assertEquals(0, checkSet.size());
+ }
+
+ @Test
+ public void testStopInsertMixKeepsOrder() {
+
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ final int testSize = 128;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ // check that the whole set is still in order
+ while (!checkSet.isEmpty()) {
+
+ Iterator<TestElement> iterator = checkSet.iterator();
+ TestElement element = iterator.next();
+ iterator.remove();
+
+ boolean removesHead = element.equals(priorityQueue.peek());
+ if (removesHead) {
+ Assert.assertTrue(priorityQueue.remove(element));
+ } else {
+ priorityQueue.remove(element);
+ }
+ Assert.assertEquals(checkSet.size(), priorityQueue.size());
+
+ long lastPriorityValue = Long.MIN_VALUE;
--- End diff --
👍
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199567167
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
+ * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
+ * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
+ * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
+ * not on equals.
+ *
+ * <p>Possible future improvements:
+ * <ul>
+ * <li>We could also implement shrinking for the heap.</li>
+ * </ul>
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
+
+ /**
+ * The index of the head element in the array that represents the heap.
+ */
+ private static final int QUEUE_HEAD_INDEX = 1;
+
+ /**
+ * Comparator for the contained elements.
+ */
+ private final Comparator<T> elementComparator;
+
+ /**
+ * The array that represents the heap-organized priority queue.
+ */
+ private T[] queue;
+
+ /**
+ * The current size of the priority queue.
+ */
+ private int size;
+
+ /**
+ * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
+ *
+ * @param elementComparator comparator for the contained elements.
+ * @param minimumCapacity the minimum and initial capacity of this priority queue.
+ */
+ @SuppressWarnings("unchecked")
+ public HeapPriorityQueue(
+ @Nonnull Comparator<T> elementComparator,
+ @Nonnegative int minimumCapacity) {
+
+ this.elementComparator = elementComparator;
+ this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
+ }
+
+ @Override
+ @Nullable
+ public T poll() {
+ return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
+ }
+
+ @Override
+ @Nullable
+ public T peek() {
+ return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
+ }
+
+ /**
+ * Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean add(@Nonnull T toAdd) {
+ return addInternal(toAdd);
+ }
+
+ /**
+ * This remove is based on object identity, not the result of equals.
+ *
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ @Override
+ public boolean remove(@Nonnull T toStop) {
+ return removeInternal(toStop);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ @Override
+ @Nonnegative
+ public int size() {
+ return size;
+ }
+
+ public void clear() {
+ size = 0;
+ Arrays.fill(queue, null);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ @Nonnull
+ public <O> O[] toArray(O[] out) {
--- End diff --
Why is it another arbitrary parameter `O`? `T` seems to work, at least for tests.
Also @Nonnull for out.
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199817285
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+ Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
+ @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+ @Nonnull Set<TestElement> checkSet,
+ int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority = localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated = priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ Assert.assertTrue(headChangedIndicated);
+ }
+ }
+ Assert.assertEquals(count, priorityQueue.size());
+ }
+
+ @Test
+ public void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ int lastSize = priorityQueue.size();
+ Assert.assertEquals(testSize, lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ Assert.assertFalse(priorityQueue.isEmpty());
+ Assert.assertEquals(lastSize, priorityQueue.size());
+ Assert.assertEquals(testElement, priorityQueue.poll());
+ Assert.assertTrue(checkSet.remove(testElement));
+ Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+ Assert.assertEquals(0, priorityQueue.size());
+ Assert.assertEquals(0, checkSet.size());
+ }
+
+ @Test
+ public void testStopInsertMixKeepsOrder() {
+
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ final int testSize = 128;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ // check that the whole set is still in order
+ while (!checkSet.isEmpty()) {
+
+ Iterator<TestElement> iterator = checkSet.iterator();
+ TestElement element = iterator.next();
+ iterator.remove();
+
+ boolean removesHead = element.equals(priorityQueue.peek());
+ if (removesHead) {
+ Assert.assertTrue(priorityQueue.remove(element));
+ } else {
+ priorityQueue.remove(element);
+ }
+ Assert.assertEquals(checkSet.size(), priorityQueue.size());
+
+ long lastPriorityValue = Long.MIN_VALUE;
+
+ while ((element = priorityQueue.poll()) != null) {
+ Assert.assertTrue(element.getPriority() >= lastPriorityValue);
+ lastPriorityValue = element.getPriority();
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ priorityQueue.addAll(checkSet);
+ }
+ }
+
+ @Test
+ public void testPoll() {
+ InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+
+ Assert.assertNull(priorityQueue.poll());
+
+ final int testSize = 345;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ while (!priorityQueue.isEmpty()) {
+ TestElement removed = priorityQueue.poll();
+ Assert.assertNotNull(removed);
+ Assert.assertTrue(checkSet.remove(removed));
+ Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
+ lastPriorityValue = removed.getPriority();
+ }
+ Assert.assertTrue(checkSet.isEmpty());
+
+ Assert.assertNull(priorityQueue.poll());
+ }
+
+ @Test
+ public void testIsEmpty() {
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(1);
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+
+ Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
+ Assert.assertFalse(priorityQueue.isEmpty());
+
+ priorityQueue.poll();
+ Assert.assertTrue(priorityQueue.isEmpty());
+ }
+
+ @Test
+ public void testBulkAddRestoredTimers() throws Exception {
--- End diff --
👍
---
[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6228
@StefanRRichter very nice PR, I only had some very minor comments.
---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199473489
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * Interface for collection that gives in order access to elements w.r.t their priority.
+ *
+ * @param <T> type of elements in the ordered set.
+ */
+@Internal
+public interface InternalPriorityQueue<T> {
+
+ /**
+ * Retrieves and removes the first element (w.r.t. the order) of this set,
+ * or returns {@code null} if this set is empty.
+ *
+ * @return the first element of this ordered set, or {@code null} if this set is empty.
+ */
+ @Nullable
+ T poll();
+
+ /**
+ * Retrieves, but does not remove, the element (w.r.t. order) of this set,
+ * or returns {@code null} if this set is empty.
+ *
+ * @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty.
+ */
+ @Nullable
+ T peek();
+
+ /**
+ * Adds the given element to the set, if it is not already contained.
+ *
+ * @param toAdd the element to add to the set.
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
+ */
+ boolean add(@Nonnull T toAdd);
+
+ /**
+ * Removes the given element from the set, if is contained in the set.
+ *
+ * @param toRemove the element to remove.
+ * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
+ * Only returns <code>false</code> iff the head element was not changed by this operation.
--- End diff --
typo: if **it is** unclear
and **iff** already has only :)
---