You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/30 16:46:41 UTC

[flink] branch master updated: [FLINK-30624][runtime] Introduce a new HeapPriorityQueue for StatusWatermarkValve to avoid affecting the performance of memory state backend.

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 21e844b6c00 [FLINK-30624][runtime] Introduce a new HeapPriorityQueue for StatusWatermarkValve to avoid affecting the performance of memory state backend.
21e844b6c00 is described below

commit 21e844b6c00b1796fdfc00136ca26d90e889b149
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Mon Jan 30 00:00:45 2023 +0800

    [FLINK-30624][runtime] Introduce a new HeapPriorityQueue for StatusWatermarkValve to avoid affecting the performance of memory state backend.
    
    This closes #21779.
---
 .../runtime/watermarkstatus/HeapPriorityQueue.java | 294 +++++++++++++++++++
 .../watermarkstatus/StatusWatermarkValve.java      |   3 +-
 .../watermarkstatus/HeapPriorityQueueTest.java     | 316 +++++++++++++++++++++
 3 files changed, 611 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueue.java
new file mode 100644
index 00000000000..550012baea5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueue.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.watermarkstatus;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * This class has similar functions with {@link
+ * org.apache.flink.runtime.state.heap.HeapPriorityQueue}. It is introduced as the replacement of
+ * {@link org.apache.flink.runtime.state.heap.HeapPriorityQueue} to be used in {@link
+ * StatusWatermarkValve}, to avoid affecting the performance of memory state backend.
+ *
+ * <p>The reason why the performance of memory state backend will be affected if we reuse the {@link
+ * org.apache.flink.runtime.state.heap.HeapPriorityQueue}: In some scenarios, the {@link
+ * org.apache.flink.runtime.state.heap.HeapPriorityQueueElement} will only have one
+ * implementation(used by memory state backend), which allows the jvm to inline its
+ * methods(getInternalIndex, setInternalIndex). If we reuse it in {@link StatusWatermarkValve}, it
+ * will cause it to have multiple implementations. Once there are multiple implementations, its
+ * methods will be difficult to be inlined by jvm, which will result in poor performance of memory
+ * state backend.
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueue<T extends HeapPriorityQueue.HeapPriorityQueueElement> {
+
+    /** The index of the head element in the array that represents the heap. */
+    private static final int QUEUE_HEAD_INDEX = 1;
+
+    /** Comparator for the priority of contained elements. */
+    @Nonnull private final PriorityComparator<T> elementPriorityComparator;
+
+    /** The array that represents the heap-organized priority queue. */
+    @Nonnull private T[] queue;
+
+    /** The current size of the priority queue. */
+    @Nonnegative private int size;
+
+    /**
+     * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
+     *
+     * @param elementPriorityComparator comparator for the priority of contained elements.
+     * @param minimumCapacity the minimum and initial capacity of this priority queue.
+     */
+    @SuppressWarnings("unchecked")
+    public HeapPriorityQueue(
+            @Nonnull PriorityComparator<T> elementPriorityComparator,
+            @Nonnegative int minimumCapacity) {
+        this.queue = (T[]) new HeapPriorityQueueElement[getHeadElementIndex() + minimumCapacity];
+        this.size = 0;
+        this.elementPriorityComparator = elementPriorityComparator;
+    }
+
+    public void adjustModifiedElement(@Nonnull T element) {
+        final int elementIndex = element.getInternalIndex();
+        if (element == queue[elementIndex]) {
+            adjustElementAtIndex(element, elementIndex);
+        }
+    }
+
+    @Nullable
+    public T poll() {
+        return size() > 0 ? removeInternal(getHeadElementIndex()) : null;
+    }
+
+    @Nullable
+    public T peek() {
+        // References to removed elements are expected to become set to null.
+        return queue[getHeadElementIndex()];
+    }
+
+    public boolean add(@Nonnull T toAdd) {
+        addInternal(toAdd);
+        return toAdd.getInternalIndex() == getHeadElementIndex();
+    }
+
+    public boolean remove(@Nonnull T toRemove) {
+        final int elementIndex = toRemove.getInternalIndex();
+        removeInternal(elementIndex);
+        return elementIndex == getHeadElementIndex();
+    }
+
+    public boolean isEmpty() {
+        return size() == 0;
+    }
+
+    public int size() {
+        return size;
+    }
+
+    /** Clears the queue. */
+    public void clear() {
+        final int arrayOffset = getHeadElementIndex();
+        Arrays.fill(queue, arrayOffset, arrayOffset + size, null);
+        size = 0;
+    }
+
+    private void resizeQueueArray(int desiredSize, int minRequiredSize) {
+        if (isValidArraySize(desiredSize)) {
+            queue = Arrays.copyOf(queue, desiredSize);
+        } else if (isValidArraySize(minRequiredSize)) {
+            queue = Arrays.copyOf(queue, MAX_ARRAY_SIZE);
+        } else {
+            throw new OutOfMemoryError(
+                    "Required minimum heap size "
+                            + minRequiredSize
+                            + " exceeds maximum size of "
+                            + MAX_ARRAY_SIZE
+                            + ".");
+        }
+    }
+
+    private void moveElementToIdx(T element, int idx) {
+        queue[idx] = element;
+        element.setInternalIndex(idx);
+    }
+
+    private static boolean isValidArraySize(int size) {
+        return size >= 0 && size <= MAX_ARRAY_SIZE;
+    }
+
+    private int getHeadElementIndex() {
+        return QUEUE_HEAD_INDEX;
+    }
+
+    private void addInternal(@Nonnull T element) {
+        final int newSize = increaseSizeByOne();
+        moveElementToIdx(element, newSize);
+        siftUp(newSize);
+    }
+
+    private T removeInternal(int removeIdx) {
+        T[] heap = this.queue;
+        T removedValue = heap[removeIdx];
+
+        assert removedValue.getInternalIndex() == removeIdx;
+
+        final int oldSize = size;
+
+        if (removeIdx != oldSize) {
+            T element = heap[oldSize];
+            moveElementToIdx(element, removeIdx);
+            adjustElementAtIndex(element, removeIdx);
+        }
+
+        heap[oldSize] = null;
+
+        --size;
+        return removedValue;
+    }
+
+    private void adjustElementAtIndex(T element, int index) {
+        siftDown(index);
+        if (queue[index] == element) {
+            siftUp(index);
+        }
+    }
+
+    private void siftUp(int idx) {
+        final T[] heap = this.queue;
+        final T currentElement = heap[idx];
+        int parentIdx = idx >>> 1;
+
+        while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) {
+            moveElementToIdx(heap[parentIdx], idx);
+            idx = parentIdx;
+            parentIdx >>>= 1;
+        }
+
+        moveElementToIdx(currentElement, idx);
+    }
+
+    private void siftDown(int idx) {
+        final T[] heap = this.queue;
+        final int heapSize = this.size;
+
+        final T currentElement = heap[idx];
+        int firstChildIdx = idx << 1;
+        int secondChildIdx = firstChildIdx + 1;
+
+        if (isElementIndexValid(secondChildIdx, heapSize)
+                && isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
+            firstChildIdx = secondChildIdx;
+        }
+
+        while (isElementIndexValid(firstChildIdx, heapSize)
+                && isElementPriorityLessThen(heap[firstChildIdx], currentElement)) {
+            moveElementToIdx(heap[firstChildIdx], idx);
+            idx = firstChildIdx;
+            firstChildIdx = idx << 1;
+            secondChildIdx = firstChildIdx + 1;
+
+            if (isElementIndexValid(secondChildIdx, heapSize)
+                    && isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
+                firstChildIdx = secondChildIdx;
+            }
+        }
+
+        moveElementToIdx(currentElement, idx);
+    }
+
+    private boolean isElementIndexValid(int elementIndex, int heapSize) {
+        return elementIndex <= heapSize;
+    }
+
+    private boolean isElementPriorityLessThen(T a, T b) {
+        return elementPriorityComparator.comparePriority(a, b) < 0;
+    }
+
+    private int increaseSizeByOne() {
+        final int oldArraySize = queue.length;
+        final int minRequiredNewSize = ++size;
+        if (minRequiredNewSize >= oldArraySize) {
+            final int grow = (oldArraySize < 64) ? oldArraySize + 2 : oldArraySize >> 1;
+            resizeQueueArray(oldArraySize + grow, minRequiredNewSize);
+        }
+        // TODO implement shrinking as well?
+        return minRequiredNewSize;
+    }
+
+    /**
+     * This interface works similar to {@link Comparable} and is used to prioritize between two
+     * objects. The main difference between this interface and {@link Comparable} is it is not
+     * require to follow the usual contract between that {@link Comparable#compareTo(Object)} and
+     * {@link Object#equals(Object)}. The contract of this interface is: When two objects are equal,
+     * they indicate the same priority, but indicating the same priority does not require that both
+     * objects are equal.
+     *
+     * @param <T> type of the compared objects.
+     */
+    interface PriorityComparator<T> {
+
+        /**
+         * Compares two objects for priority. Returns a negative integer, zero, or a positive
+         * integer as the first argument has lower, equal to, or higher priority than the second.
+         *
+         * @param left left operand in the comparison by priority.
+         * @param right left operand in the comparison by priority.
+         * @return a negative integer, zero, or a positive integer as the first argument has lower,
+         *     equal to, or higher priority than the second.
+         */
+        int comparePriority(T left, T right);
+    }
+
+    /**
+     * Interface for objects that can be managed by a {@link HeapPriorityQueue}. Such an object can
+     * only be contained in at most one {@link HeapPriorityQueue} at a time.
+     */
+    interface HeapPriorityQueueElement {
+
+        /**
+         * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained
+         * in and managed by any {@link HeapPriorityQueue}. We do not strictly enforce that internal
+         * indexes must be reset to this value when elements are removed from a {@link
+         * HeapPriorityQueue}.
+         */
+        int NOT_CONTAINED = Integer.MIN_VALUE;
+
+        /**
+         * Returns the current index of this object in the internal array of {@link
+         * HeapPriorityQueue}.
+         */
+        int getInternalIndex();
+
+        /**
+         * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be
+         * called by the owning {@link HeapPriorityQueue}.
+         *
+         * @param newIndex the new index in the timer heap.
+         */
+        void setInternalIndex(int newIndex);
+    }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java
index c98befc311e..96cf0968af2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java
@@ -20,10 +20,9 @@ package org.apache.flink.streaming.runtime.watermarkstatus;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
+import org.apache.flink.streaming.runtime.watermarkstatus.HeapPriorityQueue.HeapPriorityQueueElement;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueueTest.java
new file mode 100644
index 00000000000..5bee93c66c7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueueTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.watermarkstatus;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HeapPriorityQueue}. */
+class HeapPriorityQueueTest {
+    private static final HeapPriorityQueue.PriorityComparator<TestElement>
+            TEST_ELEMENT_PRIORITY_COMPARATOR =
+                    (left, right) -> Long.compare(left.getPriority(), right.getPriority());
+
+    @Test
+    void testPeekPollOrder() {
+        final int initialCapacity = 4;
+        final int testSize = 1000;
+        final Comparator<Long> comparator = getTestElementPriorityComparator();
+        HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(initialCapacity);
+        HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+        insertRandomElements(priorityQueue, checkSet, testSize);
+
+        long lastPriorityValue = getHighestPriorityValueForComparator();
+        int lastSize = priorityQueue.size();
+        assertThat(testSize).isEqualTo(lastSize);
+        TestElement testElement;
+        while ((testElement = priorityQueue.peek()) != null) {
+            assertThat(priorityQueue.isEmpty()).isFalse();
+            assertThat(lastSize).isEqualTo(priorityQueue.size());
+            assertThat(testElement).isEqualTo(priorityQueue.poll());
+            assertThat(checkSet.remove(testElement)).isTrue();
+            assertThat(comparator.compare(testElement.getPriority(), lastPriorityValue) >= 0)
+                    .isTrue();
+            lastPriorityValue = testElement.getPriority();
+            --lastSize;
+        }
+
+        assertThat(priorityQueue.isEmpty()).isTrue();
+        assertThat(priorityQueue.size()).isZero();
+        assertThat(checkSet).isEmpty();
+    }
+
+    @Test
+    void testRemoveInsertMixKeepsOrder() {
+
+        HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+        final Comparator<Long> comparator = getTestElementPriorityComparator();
+        final ThreadLocalRandom random = ThreadLocalRandom.current();
+        final int testSize = 300;
+        final int addCounterMax = testSize / 4;
+        int iterationsTillNextAdds = random.nextInt(addCounterMax);
+        HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+        insertRandomElements(priorityQueue, checkSet, testSize);
+
+        // check that the whole set is still in order
+        while (!checkSet.isEmpty()) {
+
+            final long highestPrioValue = getHighestPriorityValueForComparator();
+
+            Iterator<TestElement> iterator = checkSet.iterator();
+            TestElement element = iterator.next();
+            iterator.remove();
+
+            final boolean removesHead = element.equals(priorityQueue.peek());
+
+            if (removesHead) {
+                assertThat(priorityQueue.remove(element)).isTrue();
+            } else {
+                priorityQueue.remove(element);
+            }
+
+            long currentPriorityWatermark;
+
+            // test some bulk polling from time to time
+            if (removesHead) {
+                currentPriorityWatermark = element.getPriority();
+            } else {
+                currentPriorityWatermark = highestPrioValue;
+            }
+
+            while ((element = priorityQueue.poll()) != null) {
+                assertThat(comparator.compare(element.getPriority(), currentPriorityWatermark) >= 0)
+                        .isTrue();
+                currentPriorityWatermark = element.getPriority();
+                if (--iterationsTillNextAdds == 0) {
+                    // some random adds
+                    iterationsTillNextAdds = random.nextInt(addCounterMax);
+                    insertRandomElements(
+                            priorityQueue, new HashSet<>(checkSet), 1 + random.nextInt(3));
+                    currentPriorityWatermark = priorityQueue.peek().getPriority();
+                }
+            }
+
+            assertThat(priorityQueue.isEmpty()).isTrue();
+
+            checkSet.forEach(priorityQueue::add);
+        }
+    }
+
+    @Test
+    void testPoll() {
+        HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+        final Comparator<Long> comparator = getTestElementPriorityComparator();
+
+        assertThat(priorityQueue.poll()).isNull();
+
+        final int testSize = 345;
+        HashSet<TestElement> checkSet = new HashSet<>(testSize);
+        insertRandomElements(priorityQueue, checkSet, testSize);
+
+        long lastPriorityValue = getHighestPriorityValueForComparator();
+        while (!priorityQueue.isEmpty()) {
+            TestElement removed = priorityQueue.poll();
+            assertThat(removed).isNotNull();
+            assertThat(checkSet.remove(removed)).isTrue();
+            assertThat(comparator.compare(removed.getPriority(), lastPriorityValue) >= 0).isTrue();
+            lastPriorityValue = removed.getPriority();
+        }
+        assertThat(checkSet).isEmpty();
+
+        assertThat(priorityQueue.poll()).isNull();
+    }
+
+    @Test
+    void testIsEmpty() {
+        HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(1);
+
+        assertThat(priorityQueue.isEmpty()).isTrue();
+
+        assertThat(priorityQueue.add(new TestElement(4711L, 42L))).isTrue();
+        assertThat(priorityQueue.isEmpty()).isFalse();
+
+        priorityQueue.poll();
+        assertThat(priorityQueue.isEmpty()).isTrue();
+    }
+
+    @Test
+    void testAdd() {
+        HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(1);
+
+        final List<TestElement> testElements =
+                Arrays.asList(new TestElement(4711L, 42L), new TestElement(815L, 23L));
+
+        testElements.sort(
+                (l, r) -> getTestElementPriorityComparator().compare(r.priority, l.priority));
+
+        assertThat(priorityQueue.add(testElements.get(0))).isTrue();
+        assertThat(priorityQueue.size()).isEqualTo(1);
+        assertThat(priorityQueue.add(testElements.get(1))).isTrue();
+        assertThat(priorityQueue.size()).isEqualTo(2);
+        assertThat(priorityQueue.poll()).isEqualTo(testElements.get(1));
+        assertThat(priorityQueue.size()).isEqualTo(1);
+        assertThat(priorityQueue.poll()).isEqualTo(testElements.get(0));
+        assertThat(priorityQueue.size()).isZero();
+    }
+
+    @Test
+    void testRemove() {
+        HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(1);
+
+        final long key = 4711L;
+        final long priorityValue = 42L;
+        final TestElement testElement = new TestElement(key, priorityValue);
+
+        assertThat(priorityQueue.add(testElement)).isTrue();
+        assertThat(priorityQueue.remove(testElement)).isTrue();
+        assertThat(priorityQueue.isEmpty()).isTrue();
+    }
+
+    @Test
+    void testClear() {
+        HeapPriorityQueue<TestElement> priorityQueueSet = newPriorityQueue(1);
+
+        int count = 10;
+        HashSet<TestElement> checkSet = new HashSet<>(count);
+        insertRandomElements(priorityQueueSet, checkSet, count);
+        assertThat(priorityQueueSet.size()).isEqualTo(count);
+        priorityQueueSet.clear();
+        assertThat(priorityQueueSet.size()).isZero();
+    }
+
+    private HeapPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
+        return new HeapPriorityQueue<>(TEST_ELEMENT_PRIORITY_COMPARATOR, initialCapacity);
+    }
+
+    private Comparator<Long> getTestElementPriorityComparator() {
+        return Long::compareTo;
+    }
+
+    private long getHighestPriorityValueForComparator() {
+        return getTestElementPriorityComparator().compare(-1L, 1L) > 0
+                ? Long.MAX_VALUE
+                : Long.MIN_VALUE;
+    }
+
+    private static void insertRandomElements(
+            HeapPriorityQueue<TestElement> priorityQueue, Set<TestElement> checkSet, int count) {
+
+        ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+        final int numUniqueKeys = Math.max(count / 4, 64);
+
+        long duplicatePriority = Long.MIN_VALUE;
+
+        final boolean checkEndSizes = priorityQueue.isEmpty();
+
+        for (int i = 0; i < count; ++i) {
+            TestElement element;
+            do {
+                long elementPriority;
+                if (duplicatePriority == Long.MIN_VALUE) {
+                    elementPriority = localRandom.nextLong();
+                } else {
+                    elementPriority = duplicatePriority;
+                    duplicatePriority = Long.MIN_VALUE;
+                }
+                element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+            } while (!checkSet.add(element));
+
+            if (localRandom.nextInt(10) == 0) {
+                duplicatePriority = element.getPriority();
+            }
+
+            final boolean headChangedIndicated = priorityQueue.add(element);
+            if (element.equals(priorityQueue.peek())) {
+                assertThat(headChangedIndicated).isTrue();
+            }
+        }
+
+        if (checkEndSizes) {
+            assertThat(count).isEqualTo(priorityQueue.size());
+        }
+    }
+
+    /** Payload for usage in the test. */
+    private static class TestElement implements HeapPriorityQueue.HeapPriorityQueueElement {
+
+        private final long key;
+        private final long priority;
+        private int internalIndex;
+
+        public TestElement(long key, long priority) {
+            this.key = key;
+            this.priority = priority;
+            this.internalIndex = NOT_CONTAINED;
+        }
+
+        public Long getKey() {
+            return key;
+        }
+
+        public long getPriority() {
+            return priority;
+        }
+
+        @Override
+        public int getInternalIndex() {
+            return internalIndex;
+        }
+
+        @Override
+        public void setInternalIndex(int newIndex) {
+            internalIndex = newIndex;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestElement that = (TestElement) o;
+            return key == that.key && priority == that.priority;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(getKey(), getPriority());
+        }
+
+        @Override
+        public String toString() {
+            return "TestElement{" + "key=" + key + ", priority=" + priority + '}';
+        }
+    }
+}