You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/30 16:46:41 UTC
[flink] branch master updated: [FLINK-30624][runtime] Introduce a new HeapPriorityQueue for StatusWatermarkValve to avoid affecting the performance of memory state backend.
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 21e844b6c00 [FLINK-30624][runtime] Introduce a new HeapPriorityQueue for StatusWatermarkValve to avoid affecting the performance of memory state backend.
21e844b6c00 is described below
commit 21e844b6c00b1796fdfc00136ca26d90e889b149
Author: Lijie Wang <wa...@gmail.com>
AuthorDate: Mon Jan 30 00:00:45 2023 +0800
[FLINK-30624][runtime] Introduce a new HeapPriorityQueue for StatusWatermarkValve to avoid affecting the performance of memory state backend.
This closes #21779.
---
.../runtime/watermarkstatus/HeapPriorityQueue.java | 294 +++++++++++++++++++
.../watermarkstatus/StatusWatermarkValve.java | 3 +-
.../watermarkstatus/HeapPriorityQueueTest.java | 316 +++++++++++++++++++++
3 files changed, 611 insertions(+), 2 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueue.java
new file mode 100644
index 00000000000..550012baea5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueue.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.watermarkstatus;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * This class has similar functions with {@link
+ * org.apache.flink.runtime.state.heap.HeapPriorityQueue}. It is introduced as the replacement of
+ * {@link org.apache.flink.runtime.state.heap.HeapPriorityQueue} to be used in {@link
+ * StatusWatermarkValve}, to avoid affecting the performance of memory state backend.
+ *
+ * <p>The reason why the performance of memory state backend will be affected if we reuse the {@link
+ * org.apache.flink.runtime.state.heap.HeapPriorityQueue}: In some scenarios, the {@link
+ * org.apache.flink.runtime.state.heap.HeapPriorityQueueElement} will only have one
+ * implementation(used by memory state backend), which allows the jvm to inline its
+ * methods(getInternalIndex, setInternalIndex). If we reuse it in {@link StatusWatermarkValve}, it
+ * will cause it to have multiple implementations. Once there are multiple implementations, its
+ * methods will be difficult to be inlined by jvm, which will result in poor performance of memory
+ * state backend.
+ *
+ * @param <T> type of the contained elements.
+ */
+public class HeapPriorityQueue<T extends HeapPriorityQueue.HeapPriorityQueueElement> {
+
+ /** The index of the head element in the array that represents the heap. */
+ private static final int QUEUE_HEAD_INDEX = 1;
+
+ /** Comparator for the priority of contained elements. */
+ @Nonnull private final PriorityComparator<T> elementPriorityComparator;
+
+ /** The array that represents the heap-organized priority queue. */
+ @Nonnull private T[] queue;
+
+ /** The current size of the priority queue. */
+ @Nonnegative private int size;
+
+ /**
+ * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
+ *
+ * @param elementPriorityComparator comparator for the priority of contained elements.
+ * @param minimumCapacity the minimum and initial capacity of this priority queue.
+ */
+ @SuppressWarnings("unchecked")
+ public HeapPriorityQueue(
+ @Nonnull PriorityComparator<T> elementPriorityComparator,
+ @Nonnegative int minimumCapacity) {
+ this.queue = (T[]) new HeapPriorityQueueElement[getHeadElementIndex() + minimumCapacity];
+ this.size = 0;
+ this.elementPriorityComparator = elementPriorityComparator;
+ }
+
+ public void adjustModifiedElement(@Nonnull T element) {
+ final int elementIndex = element.getInternalIndex();
+ if (element == queue[elementIndex]) {
+ adjustElementAtIndex(element, elementIndex);
+ }
+ }
+
+ @Nullable
+ public T poll() {
+ return size() > 0 ? removeInternal(getHeadElementIndex()) : null;
+ }
+
+ @Nullable
+ public T peek() {
+ // References to removed elements are expected to become set to null.
+ return queue[getHeadElementIndex()];
+ }
+
+ public boolean add(@Nonnull T toAdd) {
+ addInternal(toAdd);
+ return toAdd.getInternalIndex() == getHeadElementIndex();
+ }
+
+ public boolean remove(@Nonnull T toRemove) {
+ final int elementIndex = toRemove.getInternalIndex();
+ removeInternal(elementIndex);
+ return elementIndex == getHeadElementIndex();
+ }
+
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ /** Clears the queue. */
+ public void clear() {
+ final int arrayOffset = getHeadElementIndex();
+ Arrays.fill(queue, arrayOffset, arrayOffset + size, null);
+ size = 0;
+ }
+
+ private void resizeQueueArray(int desiredSize, int minRequiredSize) {
+ if (isValidArraySize(desiredSize)) {
+ queue = Arrays.copyOf(queue, desiredSize);
+ } else if (isValidArraySize(minRequiredSize)) {
+ queue = Arrays.copyOf(queue, MAX_ARRAY_SIZE);
+ } else {
+ throw new OutOfMemoryError(
+ "Required minimum heap size "
+ + minRequiredSize
+ + " exceeds maximum size of "
+ + MAX_ARRAY_SIZE
+ + ".");
+ }
+ }
+
+ private void moveElementToIdx(T element, int idx) {
+ queue[idx] = element;
+ element.setInternalIndex(idx);
+ }
+
+ private static boolean isValidArraySize(int size) {
+ return size >= 0 && size <= MAX_ARRAY_SIZE;
+ }
+
+ private int getHeadElementIndex() {
+ return QUEUE_HEAD_INDEX;
+ }
+
+ private void addInternal(@Nonnull T element) {
+ final int newSize = increaseSizeByOne();
+ moveElementToIdx(element, newSize);
+ siftUp(newSize);
+ }
+
+ private T removeInternal(int removeIdx) {
+ T[] heap = this.queue;
+ T removedValue = heap[removeIdx];
+
+ assert removedValue.getInternalIndex() == removeIdx;
+
+ final int oldSize = size;
+
+ if (removeIdx != oldSize) {
+ T element = heap[oldSize];
+ moveElementToIdx(element, removeIdx);
+ adjustElementAtIndex(element, removeIdx);
+ }
+
+ heap[oldSize] = null;
+
+ --size;
+ return removedValue;
+ }
+
+ private void adjustElementAtIndex(T element, int index) {
+ siftDown(index);
+ if (queue[index] == element) {
+ siftUp(index);
+ }
+ }
+
+ private void siftUp(int idx) {
+ final T[] heap = this.queue;
+ final T currentElement = heap[idx];
+ int parentIdx = idx >>> 1;
+
+ while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) {
+ moveElementToIdx(heap[parentIdx], idx);
+ idx = parentIdx;
+ parentIdx >>>= 1;
+ }
+
+ moveElementToIdx(currentElement, idx);
+ }
+
+ private void siftDown(int idx) {
+ final T[] heap = this.queue;
+ final int heapSize = this.size;
+
+ final T currentElement = heap[idx];
+ int firstChildIdx = idx << 1;
+ int secondChildIdx = firstChildIdx + 1;
+
+ if (isElementIndexValid(secondChildIdx, heapSize)
+ && isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
+ firstChildIdx = secondChildIdx;
+ }
+
+ while (isElementIndexValid(firstChildIdx, heapSize)
+ && isElementPriorityLessThen(heap[firstChildIdx], currentElement)) {
+ moveElementToIdx(heap[firstChildIdx], idx);
+ idx = firstChildIdx;
+ firstChildIdx = idx << 1;
+ secondChildIdx = firstChildIdx + 1;
+
+ if (isElementIndexValid(secondChildIdx, heapSize)
+ && isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
+ firstChildIdx = secondChildIdx;
+ }
+ }
+
+ moveElementToIdx(currentElement, idx);
+ }
+
+ private boolean isElementIndexValid(int elementIndex, int heapSize) {
+ return elementIndex <= heapSize;
+ }
+
+ private boolean isElementPriorityLessThen(T a, T b) {
+ return elementPriorityComparator.comparePriority(a, b) < 0;
+ }
+
+ private int increaseSizeByOne() {
+ final int oldArraySize = queue.length;
+ final int minRequiredNewSize = ++size;
+ if (minRequiredNewSize >= oldArraySize) {
+ final int grow = (oldArraySize < 64) ? oldArraySize + 2 : oldArraySize >> 1;
+ resizeQueueArray(oldArraySize + grow, minRequiredNewSize);
+ }
+ // TODO implement shrinking as well?
+ return minRequiredNewSize;
+ }
+
+ /**
+ * This interface works similar to {@link Comparable} and is used to prioritize between two
+ * objects. The main difference between this interface and {@link Comparable} is it is not
+ * require to follow the usual contract between that {@link Comparable#compareTo(Object)} and
+ * {@link Object#equals(Object)}. The contract of this interface is: When two objects are equal,
+ * they indicate the same priority, but indicating the same priority does not require that both
+ * objects are equal.
+ *
+ * @param <T> type of the compared objects.
+ */
+ interface PriorityComparator<T> {
+
+ /**
+ * Compares two objects for priority. Returns a negative integer, zero, or a positive
+ * integer as the first argument has lower, equal to, or higher priority than the second.
+ *
+ * @param left left operand in the comparison by priority.
+ * @param right left operand in the comparison by priority.
+ * @return a negative integer, zero, or a positive integer as the first argument has lower,
+ * equal to, or higher priority than the second.
+ */
+ int comparePriority(T left, T right);
+ }
+
+ /**
+ * Interface for objects that can be managed by a {@link HeapPriorityQueue}. Such an object can
+ * only be contained in at most one {@link HeapPriorityQueue} at a time.
+ */
+ interface HeapPriorityQueueElement {
+
+ /**
+ * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained
+ * in and managed by any {@link HeapPriorityQueue}. We do not strictly enforce that internal
+ * indexes must be reset to this value when elements are removed from a {@link
+ * HeapPriorityQueue}.
+ */
+ int NOT_CONTAINED = Integer.MIN_VALUE;
+
+ /**
+ * Returns the current index of this object in the internal array of {@link
+ * HeapPriorityQueue}.
+ */
+ int getInternalIndex();
+
+ /**
+ * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be
+ * called by the owning {@link HeapPriorityQueue}.
+ *
+ * @param newIndex the new index in the timer heap.
+ */
+ void setInternalIndex(int newIndex);
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java
index c98befc311e..96cf0968af2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.java
@@ -20,10 +20,9 @@ package org.apache.flink.streaming.runtime.watermarkstatus;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
-import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
+import org.apache.flink.streaming.runtime.watermarkstatus.HeapPriorityQueue.HeapPriorityQueueElement;
import org.apache.flink.util.Preconditions;
import static org.apache.flink.util.Preconditions.checkArgument;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueueTest.java
new file mode 100644
index 00000000000..5bee93c66c7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/watermarkstatus/HeapPriorityQueueTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.watermarkstatus;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HeapPriorityQueue}. */
+class HeapPriorityQueueTest {
+ private static final HeapPriorityQueue.PriorityComparator<TestElement>
+ TEST_ELEMENT_PRIORITY_COMPARATOR =
+ (left, right) -> Long.compare(left.getPriority(), right.getPriority());
+
+ @Test
+ void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ final Comparator<Long> comparator = getTestElementPriorityComparator();
+ HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomElements(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = getHighestPriorityValueForComparator();
+ int lastSize = priorityQueue.size();
+ assertThat(testSize).isEqualTo(lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ assertThat(priorityQueue.isEmpty()).isFalse();
+ assertThat(lastSize).isEqualTo(priorityQueue.size());
+ assertThat(testElement).isEqualTo(priorityQueue.poll());
+ assertThat(checkSet.remove(testElement)).isTrue();
+ assertThat(comparator.compare(testElement.getPriority(), lastPriorityValue) >= 0)
+ .isTrue();
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ assertThat(priorityQueue.isEmpty()).isTrue();
+ assertThat(priorityQueue.size()).isZero();
+ assertThat(checkSet).isEmpty();
+ }
+
+ @Test
+ void testRemoveInsertMixKeepsOrder() {
+
+ HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+ final Comparator<Long> comparator = getTestElementPriorityComparator();
+ final ThreadLocalRandom random = ThreadLocalRandom.current();
+ final int testSize = 300;
+ final int addCounterMax = testSize / 4;
+ int iterationsTillNextAdds = random.nextInt(addCounterMax);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomElements(priorityQueue, checkSet, testSize);
+
+ // check that the whole set is still in order
+ while (!checkSet.isEmpty()) {
+
+ final long highestPrioValue = getHighestPriorityValueForComparator();
+
+ Iterator<TestElement> iterator = checkSet.iterator();
+ TestElement element = iterator.next();
+ iterator.remove();
+
+ final boolean removesHead = element.equals(priorityQueue.peek());
+
+ if (removesHead) {
+ assertThat(priorityQueue.remove(element)).isTrue();
+ } else {
+ priorityQueue.remove(element);
+ }
+
+ long currentPriorityWatermark;
+
+ // test some bulk polling from time to time
+ if (removesHead) {
+ currentPriorityWatermark = element.getPriority();
+ } else {
+ currentPriorityWatermark = highestPrioValue;
+ }
+
+ while ((element = priorityQueue.poll()) != null) {
+ assertThat(comparator.compare(element.getPriority(), currentPriorityWatermark) >= 0)
+ .isTrue();
+ currentPriorityWatermark = element.getPriority();
+ if (--iterationsTillNextAdds == 0) {
+ // some random adds
+ iterationsTillNextAdds = random.nextInt(addCounterMax);
+ insertRandomElements(
+ priorityQueue, new HashSet<>(checkSet), 1 + random.nextInt(3));
+ currentPriorityWatermark = priorityQueue.peek().getPriority();
+ }
+ }
+
+ assertThat(priorityQueue.isEmpty()).isTrue();
+
+ checkSet.forEach(priorityQueue::add);
+ }
+ }
+
+ @Test
+ void testPoll() {
+ HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
+ final Comparator<Long> comparator = getTestElementPriorityComparator();
+
+ assertThat(priorityQueue.poll()).isNull();
+
+ final int testSize = 345;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+ insertRandomElements(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = getHighestPriorityValueForComparator();
+ while (!priorityQueue.isEmpty()) {
+ TestElement removed = priorityQueue.poll();
+ assertThat(removed).isNotNull();
+ assertThat(checkSet.remove(removed)).isTrue();
+ assertThat(comparator.compare(removed.getPriority(), lastPriorityValue) >= 0).isTrue();
+ lastPriorityValue = removed.getPriority();
+ }
+ assertThat(checkSet).isEmpty();
+
+ assertThat(priorityQueue.poll()).isNull();
+ }
+
+ @Test
+ void testIsEmpty() {
+ HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(1);
+
+ assertThat(priorityQueue.isEmpty()).isTrue();
+
+ assertThat(priorityQueue.add(new TestElement(4711L, 42L))).isTrue();
+ assertThat(priorityQueue.isEmpty()).isFalse();
+
+ priorityQueue.poll();
+ assertThat(priorityQueue.isEmpty()).isTrue();
+ }
+
+ @Test
+ void testAdd() {
+ HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(1);
+
+ final List<TestElement> testElements =
+ Arrays.asList(new TestElement(4711L, 42L), new TestElement(815L, 23L));
+
+ testElements.sort(
+ (l, r) -> getTestElementPriorityComparator().compare(r.priority, l.priority));
+
+ assertThat(priorityQueue.add(testElements.get(0))).isTrue();
+ assertThat(priorityQueue.size()).isEqualTo(1);
+ assertThat(priorityQueue.add(testElements.get(1))).isTrue();
+ assertThat(priorityQueue.size()).isEqualTo(2);
+ assertThat(priorityQueue.poll()).isEqualTo(testElements.get(1));
+ assertThat(priorityQueue.size()).isEqualTo(1);
+ assertThat(priorityQueue.poll()).isEqualTo(testElements.get(0));
+ assertThat(priorityQueue.size()).isZero();
+ }
+
+ @Test
+ void testRemove() {
+ HeapPriorityQueue<TestElement> priorityQueue = newPriorityQueue(1);
+
+ final long key = 4711L;
+ final long priorityValue = 42L;
+ final TestElement testElement = new TestElement(key, priorityValue);
+
+ assertThat(priorityQueue.add(testElement)).isTrue();
+ assertThat(priorityQueue.remove(testElement)).isTrue();
+ assertThat(priorityQueue.isEmpty()).isTrue();
+ }
+
+ @Test
+ void testClear() {
+ HeapPriorityQueue<TestElement> priorityQueueSet = newPriorityQueue(1);
+
+ int count = 10;
+ HashSet<TestElement> checkSet = new HashSet<>(count);
+ insertRandomElements(priorityQueueSet, checkSet, count);
+ assertThat(priorityQueueSet.size()).isEqualTo(count);
+ priorityQueueSet.clear();
+ assertThat(priorityQueueSet.size()).isZero();
+ }
+
+ private HeapPriorityQueue<TestElement> newPriorityQueue(int initialCapacity) {
+ return new HeapPriorityQueue<>(TEST_ELEMENT_PRIORITY_COMPARATOR, initialCapacity);
+ }
+
+ private Comparator<Long> getTestElementPriorityComparator() {
+ return Long::compareTo;
+ }
+
+ private long getHighestPriorityValueForComparator() {
+ return getTestElementPriorityComparator().compare(-1L, 1L) > 0
+ ? Long.MAX_VALUE
+ : Long.MIN_VALUE;
+ }
+
+ private static void insertRandomElements(
+ HeapPriorityQueue<TestElement> priorityQueue, Set<TestElement> checkSet, int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ final boolean checkEndSizes = priorityQueue.isEmpty();
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority = localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated = priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ assertThat(headChangedIndicated).isTrue();
+ }
+ }
+
+ if (checkEndSizes) {
+ assertThat(count).isEqualTo(priorityQueue.size());
+ }
+ }
+
+ /** Payload for usage in the test. */
+ private static class TestElement implements HeapPriorityQueue.HeapPriorityQueueElement {
+
+ private final long key;
+ private final long priority;
+ private int internalIndex;
+
+ public TestElement(long key, long priority) {
+ this.key = key;
+ this.priority = priority;
+ this.internalIndex = NOT_CONTAINED;
+ }
+
+ public Long getKey() {
+ return key;
+ }
+
+ public long getPriority() {
+ return priority;
+ }
+
+ @Override
+ public int getInternalIndex() {
+ return internalIndex;
+ }
+
+ @Override
+ public void setInternalIndex(int newIndex) {
+ internalIndex = newIndex;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestElement that = (TestElement) o;
+ return key == that.key && priority == that.priority;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getKey(), getPriority());
+ }
+
+ @Override
+ public String toString() {
+ return "TestElement{" + "key=" + key + ", priority=" + priority + '}';
+ }
+ }
+}