You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/12 23:13:06 UTC
[pulsar] branch master updated: [fix][broker] Fixed error when delayed messages trackers state grows to >1.5GB (#16490)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 09f43337831 [fix][broker] Fixed error when delayed messages trackers state grows to >1.5GB (#16490)
09f43337831 is described below
commit 09f43337831cec681b8d7cdaab375383ca63d16b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jul 12 16:12:57 2022 -0700
[fix][broker] Fixed error when delayed messages trackers state grows to >1.5GB (#16490)
* Fixed error when delayed messages trackers state grows to >1.5GB
* Fixed spotbugs issues
* Fixed javadocs
* In the constructor, ensure all segments after the first one are of max size
* Use poll to figure out where the test is stuck
* Added SegmentedLongArray specific unit test
* Removed unused imports
---
.../PersistentDispatcherFailoverConsumerTest.java | 26 ++-
.../util/collections/SegmentedLongArray.java | 128 ++++++++++++++
.../util/collections/TripleLongPriorityQueue.java | 187 ++++++++++-----------
.../util/collections/SegmentedLongArrayTest.java | 103 ++++++++++++
.../collections/TripleLongPriorityQueueTest.java | 41 ++++-
5 files changed, 369 insertions(+), 116 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 4e2e0323884..ec4ed02e35d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertSame;
@@ -47,6 +48,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -360,7 +362,8 @@ public class PersistentDispatcherFailoverConsumerTest {
// 4. Verify active consumer
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
// get the notified with who is the leader
- change = consumerChanges.take();
+ change = consumerChanges.poll(10, TimeUnit.SECONDS);
+ assertNotNull(change);
verifyActiveConsumerChange(change, 1, true);
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
@@ -372,7 +375,8 @@ public class PersistentDispatcherFailoverConsumerTest {
assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
assertEquals(3, consumers.size());
// get notified with who is the leader
- change = consumerChanges.take();
+ change = consumerChanges.poll(10, TimeUnit.SECONDS);
+ assertNotNull(change);
verifyActiveConsumerChange(change, 2, false);
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));
@@ -387,13 +391,17 @@ public class PersistentDispatcherFailoverConsumerTest {
assertEquals(4, consumers.size());
// all consumers will receive notifications
- change = consumerChanges.take();
+ change = consumerChanges.poll(10, TimeUnit.SECONDS);
+ assertNotNull(change);
verifyActiveConsumerChange(change, 0, true);
- change = consumerChanges.take();
+ change = consumerChanges.poll(10, TimeUnit.SECONDS);
+ assertNotNull(change);
verifyActiveConsumerChange(change, 1, false);
- change = consumerChanges.take();
+ change = consumerChanges.poll(10, TimeUnit.SECONDS);
+ assertNotNull(change);
verifyActiveConsumerChange(change, 1, false);
- change = consumerChanges.take();
+ change = consumerChanges.poll(10, TimeUnit.SECONDS);
+ assertNotNull(change);
verifyActiveConsumerChange(change, 2, false);
verify(consumer0, times(1)).notifyActiveConsumerChange(same(consumer0));
verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
@@ -419,9 +427,11 @@ public class PersistentDispatcherFailoverConsumerTest {
assertEquals(2, consumers.size());
// the remaining consumers will receive notifications
- change = consumerChanges.take();
+ change = consumerChanges.poll(10, TimeUnit.SECONDS);
+ assertNotNull(change);
verifyActiveConsumerChange(change, 1, true);
- change = consumerChanges.take();
+ change = consumerChanges.poll(10, TimeUnit.SECONDS);
+ assertNotNull(change);
verifyActiveConsumerChange(change, 1, true);
// 10. Attempt to remove already removed consumer
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java
new file mode 100644
index 00000000000..dc4d1c4908c
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+
+@NotThreadSafe
+public class SegmentedLongArray implements AutoCloseable {
+
+ private static final int SIZE_OF_LONG = 8;
+
+ private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
+ private final List<ByteBuf> buffers = new ArrayList<>();
+
+ @Getter
+ private final long initialCapacity;
+
+ @Getter
+ private long capacity;
+
+ public SegmentedLongArray(long initialCapacity) {
+ long remainingToAdd = initialCapacity;
+
+ // Add first segment
+ int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
+ buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
+ buffers.add(buffer);
+ remainingToAdd -= sizeToAdd;
+
+ // Add the remaining segments, all at full segment size, if necessary
+ while (remainingToAdd > 0) {
+ buffer = PooledByteBufAllocator.DEFAULT.directBuffer(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
+ buffer.writerIndex(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
+ buffers.add(buffer);
+ remainingToAdd -= MAX_SEGMENT_SIZE;
+ }
+
+ this.initialCapacity = initialCapacity;
+ this.capacity = this.initialCapacity;
+ }
+
+ public void writeLong(long offset, long value) {
+ int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+ int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+ buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
+ }
+
+ public long readLong(long offset) {
+ int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+ int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+ return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
+ }
+
+ public void increaseCapacity() {
+ if (capacity < MAX_SEGMENT_SIZE) {
+ // Resize the current buffer to bigger capacity
+ capacity += (capacity <= 256 ? capacity : capacity / 2);
+ capacity = Math.min(capacity, MAX_SEGMENT_SIZE);
+ buffers.get(0).capacity((int) this.capacity * SIZE_OF_LONG);
+ buffers.get(0).writerIndex((int) this.capacity * SIZE_OF_LONG);
+ } else {
+ // Let's add 1 mode buffer to the list
+ int bufferSize = MAX_SEGMENT_SIZE * SIZE_OF_LONG;
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize, bufferSize);
+ buffer.writerIndex(bufferSize);
+ buffers.add(buffer);
+ capacity += MAX_SEGMENT_SIZE;
+ }
+ }
+
+ public void shrink(long newCapacity) {
+ if (newCapacity >= capacity || newCapacity < initialCapacity) {
+ return;
+ }
+
+ long sizeToReduce = capacity - newCapacity;
+ while (sizeToReduce >= MAX_SEGMENT_SIZE && buffers.size() > 1) {
+ ByteBuf b = buffers.remove(buffers.size() - 1);
+ b.release();
+ capacity -= MAX_SEGMENT_SIZE;
+ sizeToReduce -= MAX_SEGMENT_SIZE;
+ }
+
+ if (buffers.size() == 1 && sizeToReduce > 0) {
+ // We should also reduce the capacity of the first buffer
+ capacity -= sizeToReduce;
+ ByteBuf oldBuffer = buffers.get(0);
+ ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer((int) capacity * SIZE_OF_LONG);
+ oldBuffer.getBytes(0, newBuffer, (int) capacity * SIZE_OF_LONG);
+ oldBuffer.release();
+ buffers.set(0, newBuffer);
+ }
+ }
+
+ @Override
+ public void close() {
+ buffers.forEach(ByteBuf::release);
+ }
+
+ /**
+ * The amount of memory used to back the array of longs.
+ */
+ public long bytesCapacity() {
+ return capacity * SIZE_OF_LONG;
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
index 487c2284cff..50288247c64 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
@@ -19,9 +19,6 @@
package org.apache.pulsar.common.util.collections;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.annotations.VisibleForTesting;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
/**
* Provides a priority-queue implementation specialized on items composed by 3 longs.
@@ -29,33 +26,28 @@ import io.netty.buffer.PooledByteBufAllocator;
* <p>This class is not thread safe and the items are stored in direct memory.
*/
public class TripleLongPriorityQueue implements AutoCloseable {
-
- private static final int SIZE_OF_LONG = 8;
private static final int DEFAULT_INITIAL_CAPACITY = 16;
private static final float DEFAULT_SHRINK_FACTOR = 0.5f;
// Each item is composed of 3 longs
private static final int ITEMS_COUNT = 3;
- private static final int TUPLE_SIZE = ITEMS_COUNT * SIZE_OF_LONG;
-
/**
* Reserve 10% of the capacity when shrinking to avoid frequent expansion and shrinkage.
*/
private static final float RESERVATION_FACTOR = 0.9f;
- private ByteBuf buffer;
+ private final SegmentedLongArray array;
- private final int initialCapacity;
+ // Count of how many (long,long,long) tuples are currently inserted
+ private long tuplesCount;
- private int capacity;
- private int size;
/**
* When size < capacity * shrinkFactor, may trigger shrinking.
*/
private final float shrinkFactor;
- private float shrinkThreshold;
+ private long shrinkThreshold;
/**
* Create a new priority queue with default initial capacity.
@@ -64,13 +56,12 @@ public class TripleLongPriorityQueue implements AutoCloseable {
this(DEFAULT_INITIAL_CAPACITY);
}
- public TripleLongPriorityQueue(int initialCapacity, float shrinkFactor) {
+ public TripleLongPriorityQueue(long initialCapacity, float shrinkFactor) {
+ checkArgument(initialCapacity > 0);
checkArgument(shrinkFactor > 0);
- this.initialCapacity = initialCapacity;
- this.capacity = initialCapacity;
- this.shrinkThreshold = this.capacity * shrinkFactor;
- this.buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * TUPLE_SIZE);
- this.size = 0;
+ this.array = new SegmentedLongArray(initialCapacity * ITEMS_COUNT);
+ this.tuplesCount = 0;
+ this.shrinkThreshold = (long) (initialCapacity * shrinkFactor);
this.shrinkFactor = shrinkFactor;
}
@@ -87,7 +78,7 @@ public class TripleLongPriorityQueue implements AutoCloseable {
*/
@Override
public void close() {
- buffer.release();
+ array.close();
}
/**
@@ -98,13 +89,14 @@ public class TripleLongPriorityQueue implements AutoCloseable {
* @param n3
*/
public void add(long n1, long n2, long n3) {
- if (size == capacity) {
- increaseCapacity();
+ long arrayIdx = tuplesCount * ITEMS_COUNT;
+ if ((arrayIdx + 2) >= array.getCapacity()) {
+ array.increaseCapacity();
}
- put(size, n1, n2, n3);
- siftUp(size);
- ++size;
+ put(tuplesCount, n1, n2, n3);
+ siftUp(tuplesCount);
+ ++tuplesCount;
}
/**
@@ -113,8 +105,8 @@ public class TripleLongPriorityQueue implements AutoCloseable {
* <p>The tuple will not be extracted
*/
public long peekN1() {
- checkArgument(size != 0);
- return buffer.getLong(0);
+ checkArgument(tuplesCount != 0);
+ return array.readLong(0);
}
/**
@@ -123,8 +115,8 @@ public class TripleLongPriorityQueue implements AutoCloseable {
* <p>The tuple will not be extracted
*/
public long peekN2() {
- checkArgument(size != 0);
- return buffer.getLong(0 + 1 * SIZE_OF_LONG);
+ checkArgument(tuplesCount != 0);
+ return array.readLong(1);
}
/**
@@ -133,17 +125,17 @@ public class TripleLongPriorityQueue implements AutoCloseable {
* <p>The tuple will not be extracted
*/
public long peekN3() {
- checkArgument(size != 0);
- return buffer.getLong(0 + 2 * SIZE_OF_LONG);
+ checkArgument(tuplesCount != 0);
+ return array.readLong(2);
}
/**
* Removes the first item from the queue.
*/
public void pop() {
- checkArgument(size != 0);
- swap(0, size - 1);
- size--;
+ checkArgument(tuplesCount != 0);
+ swap(0, tuplesCount - 1);
+ tuplesCount--;
siftDown(0);
shrinkCapacity();
}
@@ -152,132 +144,125 @@ public class TripleLongPriorityQueue implements AutoCloseable {
* Returns whether the priority queue is empty.
*/
public boolean isEmpty() {
- return size == 0;
+ return tuplesCount == 0;
}
/**
* Returns the number of tuples in the priority queue.
*/
- public int size() {
- return size;
+ public long size() {
+ return tuplesCount;
+ }
+
+ /**
+ * The amount of memory used to back the priority queue.
+ */
+ public long bytesCapacity() {
+ return array.bytesCapacity();
}
/**
* Clear all items.
*/
public void clear() {
- this.buffer.clear();
- this.size = 0;
+ this.tuplesCount = 0;
shrinkCapacity();
}
- private void increaseCapacity() {
- // For bigger sizes, increase by 50%
- this.capacity += (capacity <= 256 ? capacity : capacity / 2);
- this.shrinkThreshold = this.capacity * shrinkFactor;
- buffer.capacity(this.capacity * TUPLE_SIZE);
- }
-
private void shrinkCapacity() {
- if (capacity > initialCapacity && size < shrinkThreshold) {
- int decreasingSize = (int) (capacity * shrinkFactor * RESERVATION_FACTOR);
- if (decreasingSize <= 0) {
+ if (tuplesCount <= shrinkThreshold && array.getCapacity() > array.getInitialCapacity()) {
+ long sizeToShrink = (long) (array.getCapacity() * shrinkFactor * RESERVATION_FACTOR);
+ if (sizeToShrink == 0) {
return;
}
- if (capacity - decreasingSize <= initialCapacity) {
- this.capacity = initialCapacity;
+
+ long newCapacity;
+ if (array.getCapacity() - sizeToShrink <= array.getInitialCapacity()) {
+ newCapacity = array.getInitialCapacity();
} else {
- this.capacity = capacity - decreasingSize;
+ newCapacity = array.getCapacity() - sizeToShrink;
}
- this.shrinkThreshold = this.capacity * shrinkFactor;
- ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(this.capacity * TUPLE_SIZE);
- buffer.getBytes(0, newBuffer, size * TUPLE_SIZE);
- buffer.release();
- this.buffer = newBuffer;
+ array.shrink(newCapacity);
+ this.shrinkThreshold = (long) (array.getCapacity() / (double) ITEMS_COUNT * shrinkFactor);
}
}
- private void siftUp(int idx) {
- while (idx > 0) {
- int parentIdx = (idx - 1) / 2;
- if (compare(idx, parentIdx) >= 0) {
+ private void siftUp(long tupleIdx) {
+ while (tupleIdx > 0) {
+ long parentIdx = (tupleIdx - 1) / 2;
+ if (compare(tupleIdx, parentIdx) >= 0) {
break;
}
- swap(idx, parentIdx);
- idx = parentIdx;
+ swap(tupleIdx, parentIdx);
+ tupleIdx = parentIdx;
}
}
- private void siftDown(int idx) {
- int half = size / 2;
- while (idx < half) {
- int left = 2 * idx + 1;
- int right = 2 * idx + 2;
+ private void siftDown(long tupleIdx) {
+ long half = tuplesCount / 2;
+ while (tupleIdx < half) {
+ long left = 2 * tupleIdx + 1;
+ long right = 2 * tupleIdx + 2;
- int swapIdx = idx;
+ long swapIdx = tupleIdx;
- if (compare(idx, left) > 0) {
+ if (compare(tupleIdx, left) > 0) {
swapIdx = left;
}
- if (right < size && compare(swapIdx, right) > 0) {
+ if (right < tuplesCount && compare(swapIdx, right) > 0) {
swapIdx = right;
}
- if (swapIdx == idx) {
+ if (swapIdx == tupleIdx) {
return;
}
- swap(idx, swapIdx);
- idx = swapIdx;
+ swap(tupleIdx, swapIdx);
+ tupleIdx = swapIdx;
}
}
- private void put(int idx, long n1, long n2, long n3) {
- int i = idx * TUPLE_SIZE;
- buffer.setLong(i, n1);
- buffer.setLong(i + 1 * SIZE_OF_LONG, n2);
- buffer.setLong(i + 2 * SIZE_OF_LONG, n3);
+ private void put(long tupleIdx, long n1, long n2, long n3) {
+ long idx = tupleIdx * ITEMS_COUNT;
+ array.writeLong(idx, n1);
+ array.writeLong(idx + 1, n2);
+ array.writeLong(idx + 2, n3);
}
- private int compare(int idx1, int idx2) {
- int i1 = idx1 * TUPLE_SIZE;
- int i2 = idx2 * TUPLE_SIZE;
+ private int compare(long tupleIdx1, long tupleIdx2) {
+ long idx1 = tupleIdx1 * ITEMS_COUNT;
+ long idx2 = tupleIdx2 * ITEMS_COUNT;
- int c1 = Long.compare(buffer.getLong(i1), buffer.getLong(i2));
+ int c1 = Long.compare(array.readLong(idx1), array.readLong(idx2));
if (c1 != 0) {
return c1;
}
- int c2 = Long.compare(buffer.getLong(i1 + SIZE_OF_LONG), buffer.getLong(i2 + SIZE_OF_LONG));
+ int c2 = Long.compare(array.readLong(idx1 + 1), array.readLong(idx2 + 1));
if (c2 != 0) {
return c2;
}
- return Long.compare(buffer.getLong(i1 + 2 * SIZE_OF_LONG), buffer.getLong(i2 + 2 * SIZE_OF_LONG));
+ return Long.compare(array.readLong(idx1 + 2), array.readLong(idx2 + 2));
}
- private void swap(int idx1, int idx2) {
- int i1 = idx1 * TUPLE_SIZE;
- int i2 = idx2 * TUPLE_SIZE;
+ private void swap(long tupleIdx1, long tupleIdx2) {
+ long idx1 = tupleIdx1 * ITEMS_COUNT;
+ long idx2 = tupleIdx2 * ITEMS_COUNT;
- long tmp1 = buffer.getLong(i1);
- long tmp2 = buffer.getLong(i1 + 1 * SIZE_OF_LONG);
- long tmp3 = buffer.getLong(i1 + 2 * SIZE_OF_LONG);
+ long tmp1 = array.readLong(idx1);
+ long tmp2 = array.readLong(idx1 + 1);
+ long tmp3 = array.readLong(idx1 + 2);
- buffer.setLong(i1, buffer.getLong(i2));
- buffer.setLong(i1 + 1 * SIZE_OF_LONG, buffer.getLong(i2 + 1 * SIZE_OF_LONG));
- buffer.setLong(i1 + 2 * SIZE_OF_LONG, buffer.getLong(i2 + 2 * SIZE_OF_LONG));
-
- buffer.setLong(i2, tmp1);
- buffer.setLong(i2 + 1 * SIZE_OF_LONG, tmp2);
- buffer.setLong(i2 + 2 * SIZE_OF_LONG, tmp3);
- }
+ array.writeLong(idx1, array.readLong(idx2));
+ array.writeLong(idx1 + 1, array.readLong(idx2 + 1));
+ array.writeLong(idx1 + 2, array.readLong(idx2 + 2));
- @VisibleForTesting
- ByteBuf getBuffer() {
- return buffer;
+ array.writeLong(idx2, tmp1);
+ array.writeLong(idx2 + 1, tmp2);
+ array.writeLong(idx2 + 2, tmp3);
}
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java
new file mode 100644
index 00000000000..efb86fd4f9d
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import lombok.Cleanup;
+import org.testng.annotations.Test;
+
+public class SegmentedLongArrayTest {
+
+ @Test
+ public void testArray() {
+ @Cleanup
+ SegmentedLongArray a = new SegmentedLongArray(4);
+ assertEquals(a.getCapacity(), 4);
+ assertEquals(a.bytesCapacity(), 4 * 8);
+ assertEquals(a.getInitialCapacity(), 4);
+
+ a.writeLong(0, 0);
+ a.writeLong(1, 1);
+ a.writeLong(2, 2);
+ a.writeLong(3, Long.MAX_VALUE);
+
+ try {
+ a.writeLong(4, Long.MIN_VALUE);
+ fail("should have failed");
+ } catch (IndexOutOfBoundsException e) {
+ // Expected
+ }
+
+ a.increaseCapacity();
+
+ a.writeLong(4, Long.MIN_VALUE);
+
+ assertEquals(a.getCapacity(), 8);
+ assertEquals(a.bytesCapacity(), 8 * 8);
+ assertEquals(a.getInitialCapacity(), 4);
+
+ assertEquals(a.readLong(0), 0);
+ assertEquals(a.readLong(1), 1);
+ assertEquals(a.readLong(2), 2);
+ assertEquals(a.readLong(3), Long.MAX_VALUE);
+ assertEquals(a.readLong(4), Long.MIN_VALUE);
+
+ a.shrink(5);
+ assertEquals(a.getCapacity(), 5);
+ assertEquals(a.bytesCapacity(), 5 * 8);
+ assertEquals(a.getInitialCapacity(), 4);
+ }
+
+ @Test
+ public void testLargeArray() {
+ long initialCap = 3 * 1024 * 1024;
+
+ @Cleanup
+ SegmentedLongArray a = new SegmentedLongArray(initialCap);
+ assertEquals(a.getCapacity(), initialCap);
+ assertEquals(a.bytesCapacity(), initialCap * 8);
+ assertEquals(a.getInitialCapacity(), initialCap);
+
+ long baseOffset = initialCap - 100;
+
+ a.writeLong(baseOffset, 0);
+ a.writeLong(baseOffset + 1, 1);
+ a.writeLong(baseOffset + 2, 2);
+ a.writeLong(baseOffset + 3, Long.MAX_VALUE);
+ a.writeLong(baseOffset + 4, Long.MIN_VALUE);
+
+ a.increaseCapacity();
+
+ assertEquals(a.getCapacity(), 5 * 1024 * 1024);
+ assertEquals(a.bytesCapacity(), 5 * 1024 * 1024 * 8);
+ assertEquals(a.getInitialCapacity(), initialCap);
+
+ assertEquals(a.readLong(baseOffset), 0);
+ assertEquals(a.readLong(baseOffset + 1), 1);
+ assertEquals(a.readLong(baseOffset + 2), 2);
+ assertEquals(a.readLong(baseOffset + 3), Long.MAX_VALUE);
+ assertEquals(a.readLong(baseOffset + 4), Long.MIN_VALUE);
+
+ a.shrink(initialCap);
+ assertEquals(a.getCapacity(), initialCap);
+ assertEquals(a.bytesCapacity(), initialCap * 8);
+ assertEquals(a.getInitialCapacity(), initialCap);
+ }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
index bd3aef86ad1..de3b1adb792 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
@@ -54,6 +54,35 @@ public class TripleLongPriorityQueueTest {
pq.close();
}
+ @Test
+ public void testLargeQueue() {
+ TripleLongPriorityQueue pq = new TripleLongPriorityQueue();
+ assertEquals(pq.size(), 0);
+
+ final int N = 3_000_000;
+
+ for (int i = N; i > 0; i--) {
+ pq.add(i, i * 2L, i * 3L);
+ }
+
+ assertEquals(pq.size(), N);
+ assertFalse(pq.isEmpty());
+
+ for (int i = 1; i <= N; i++) {
+ assertEquals(pq.peekN1(), i);
+ assertEquals(pq.peekN2(), i * 2);
+ assertEquals(pq.peekN3(), i * 3);
+
+ pq.pop();
+
+ assertEquals(pq.size(), N - i);
+ }
+
+ pq.clear();
+ pq.close();
+ }
+
+
@Test
public void testCheckForEmpty() {
TripleLongPriorityQueue pq = new TripleLongPriorityQueue();
@@ -143,26 +172,24 @@ public class TripleLongPriorityQueueTest {
TripleLongPriorityQueue pq = new TripleLongPriorityQueue(initialCapacity, 0.5f);
pq.add(0, 0, 0);
assertEquals(pq.size(), 1);
- assertEquals(pq.getBuffer().capacity(), initialCapacity * tupleSize);
+ assertEquals(pq.bytesCapacity(), initialCapacity * tupleSize);
// Scale out to capacity * 2
triggerScaleOut(initialCapacity, pq);
int scaleCapacity = initialCapacity * 2;
- assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
+ assertEquals(pq.bytesCapacity(), scaleCapacity * tupleSize);
// Trigger shrinking
- for (int i = 0; i < initialCapacity / 2 + 1; i++) {
+ for (int i = 0; i < initialCapacity / 2 + 2; i++) {
pq.pop();
}
- int capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
- assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
+ int capacity = scaleCapacity - (int)((scaleCapacity ) * 0.5f * 0.9f);
+ assertTrue(pq.bytesCapacity() < scaleCapacity * tupleSize);
// Scale out to capacity * 2
triggerScaleOut(initialCapacity, pq);
scaleCapacity = capacity * 2;
- assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
// Trigger shrinking
pq.clear();
capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
- assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
}
private void triggerScaleOut(int initialCapacity, TripleLongPriorityQueue pq) {