You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/12 23:13:06 UTC

[pulsar] branch master updated: [fix][broker] Fixed error when delayed messages trackers state grows to >1.5GB (#16490)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 09f43337831 [fix][broker] Fixed error when delayed messages trackers state grows to >1.5GB (#16490)
09f43337831 is described below

commit 09f43337831cec681b8d7cdaab375383ca63d16b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jul 12 16:12:57 2022 -0700

    [fix][broker] Fixed error when delayed messages trackers state grows to >1.5GB (#16490)
    
    * Fixed error when delayed messages trackers state grows to >1.5GB
    
    * Fixed spotbugs issues
    
    * Fixed javadocs
    
    * In the constructor, ensure all segments after the first one are of max size
    
    * Use poll to figure out where the test is stuck
    
    * Added SegmentedLongArray specific unit test
    
    * Removed unused imports
---
 .../PersistentDispatcherFailoverConsumerTest.java  |  26 ++-
 .../util/collections/SegmentedLongArray.java       | 128 ++++++++++++++
 .../util/collections/TripleLongPriorityQueue.java  | 187 ++++++++++-----------
 .../util/collections/SegmentedLongArrayTest.java   | 103 ++++++++++++
 .../collections/TripleLongPriorityQueueTest.java   |  41 ++++-
 5 files changed, 369 insertions(+), 116 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 4e2e0323884..ec4ed02e35d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertSame;
@@ -47,6 +48,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -360,7 +362,8 @@ public class PersistentDispatcherFailoverConsumerTest {
         // 4. Verify active consumer
         assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
         // get the notified with who is the leader
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 1, true);
         verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
 
@@ -372,7 +375,8 @@ public class PersistentDispatcherFailoverConsumerTest {
         assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
         assertEquals(3, consumers.size());
         // get notified with who is the leader
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 2, false);
         verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
         verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));
@@ -387,13 +391,17 @@ public class PersistentDispatcherFailoverConsumerTest {
         assertEquals(4, consumers.size());
 
         // all consumers will receive notifications
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 0, true);
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 1, false);
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 1, false);
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 2, false);
         verify(consumer0, times(1)).notifyActiveConsumerChange(same(consumer0));
         verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
@@ -419,9 +427,11 @@ public class PersistentDispatcherFailoverConsumerTest {
         assertEquals(2, consumers.size());
 
         // the remaining consumers will receive notifications
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 1, true);
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 1, true);
 
         // 10. Attempt to remove already removed consumer
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java
new file mode 100644
index 00000000000..dc4d1c4908c
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+
+@NotThreadSafe
+public class SegmentedLongArray implements AutoCloseable {
+
+    private static final int SIZE_OF_LONG = 8;
+
+    private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
+    private final List<ByteBuf> buffers = new ArrayList<>();
+
+    @Getter
+    private final long initialCapacity;
+
+    @Getter
+    private long capacity;
+
+    public SegmentedLongArray(long initialCapacity) {
+        long remainingToAdd = initialCapacity;
+
+        // Add first segment
+        int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
+        ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
+        buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
+        buffers.add(buffer);
+        remainingToAdd -= sizeToAdd;
+
+        // Add the remaining segments, all at full segment size, if necessary
+        while (remainingToAdd > 0) {
+            buffer = PooledByteBufAllocator.DEFAULT.directBuffer(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
+            buffer.writerIndex(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
+            buffers.add(buffer);
+            remainingToAdd -= MAX_SEGMENT_SIZE;
+        }
+
+        this.initialCapacity = initialCapacity;
+        this.capacity = this.initialCapacity;
+    }
+
+    public void writeLong(long offset, long value) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
+    }
+
+    public long readLong(long offset) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
+    }
+
+    public void increaseCapacity() {
+        if (capacity < MAX_SEGMENT_SIZE) {
+            // Resize the current buffer to bigger capacity
+            capacity += (capacity <= 256 ? capacity : capacity / 2);
+            capacity = Math.min(capacity, MAX_SEGMENT_SIZE);
+            buffers.get(0).capacity((int) this.capacity * SIZE_OF_LONG);
+            buffers.get(0).writerIndex((int) this.capacity * SIZE_OF_LONG);
+        } else {
+            // Let's add 1 mode buffer to the list
+            int bufferSize = MAX_SEGMENT_SIZE * SIZE_OF_LONG;
+            ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize, bufferSize);
+            buffer.writerIndex(bufferSize);
+            buffers.add(buffer);
+            capacity += MAX_SEGMENT_SIZE;
+        }
+    }
+
+    public void shrink(long newCapacity) {
+        if (newCapacity >= capacity || newCapacity < initialCapacity) {
+            return;
+        }
+
+        long sizeToReduce = capacity - newCapacity;
+        while (sizeToReduce >= MAX_SEGMENT_SIZE && buffers.size() > 1) {
+            ByteBuf b = buffers.remove(buffers.size() - 1);
+            b.release();
+            capacity -= MAX_SEGMENT_SIZE;
+            sizeToReduce -= MAX_SEGMENT_SIZE;
+        }
+
+        if (buffers.size() == 1 && sizeToReduce > 0) {
+            // We should also reduce the capacity of the first buffer
+            capacity -= sizeToReduce;
+            ByteBuf oldBuffer = buffers.get(0);
+            ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer((int) capacity * SIZE_OF_LONG);
+            oldBuffer.getBytes(0, newBuffer, (int) capacity * SIZE_OF_LONG);
+            oldBuffer.release();
+            buffers.set(0, newBuffer);
+        }
+    }
+
+    @Override
+    public void close() {
+        buffers.forEach(ByteBuf::release);
+    }
+
+    /**
+     * The amount of memory used to back the array of longs.
+     */
+    public long bytesCapacity() {
+        return capacity * SIZE_OF_LONG;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
index 487c2284cff..50288247c64 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
@@ -19,9 +19,6 @@
 package org.apache.pulsar.common.util.collections;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.annotations.VisibleForTesting;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 
 /**
  * Provides a priority-queue implementation specialized on items composed by 3 longs.
@@ -29,33 +26,28 @@ import io.netty.buffer.PooledByteBufAllocator;
  * <p>This class is not thread safe and the items are stored in direct memory.
  */
 public class TripleLongPriorityQueue implements AutoCloseable {
-
-    private static final int SIZE_OF_LONG = 8;
     private static final int DEFAULT_INITIAL_CAPACITY = 16;
     private static final float DEFAULT_SHRINK_FACTOR = 0.5f;
 
     // Each item is composed of 3 longs
     private static final int ITEMS_COUNT = 3;
 
-    private static final int TUPLE_SIZE = ITEMS_COUNT * SIZE_OF_LONG;
-
     /**
      * Reserve 10% of the capacity when shrinking to avoid frequent expansion and shrinkage.
      */
     private static final float RESERVATION_FACTOR = 0.9f;
 
-    private ByteBuf buffer;
+    private final SegmentedLongArray array;
 
-    private final int initialCapacity;
+    // Count of how many (long,long,long) tuples are currently inserted
+    private long tuplesCount;
 
-    private int capacity;
-    private int size;
     /**
      * When size < capacity * shrinkFactor, may trigger shrinking.
      */
     private final float shrinkFactor;
 
-    private float shrinkThreshold;
+    private long shrinkThreshold;
 
     /**
      * Create a new priority queue with default initial capacity.
@@ -64,13 +56,12 @@ public class TripleLongPriorityQueue implements AutoCloseable {
         this(DEFAULT_INITIAL_CAPACITY);
     }
 
-    public TripleLongPriorityQueue(int initialCapacity, float shrinkFactor) {
+    public TripleLongPriorityQueue(long initialCapacity, float shrinkFactor) {
+        checkArgument(initialCapacity > 0);
         checkArgument(shrinkFactor > 0);
-        this.initialCapacity = initialCapacity;
-        this.capacity = initialCapacity;
-        this.shrinkThreshold = this.capacity * shrinkFactor;
-        this.buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * TUPLE_SIZE);
-        this.size = 0;
+        this.array = new SegmentedLongArray(initialCapacity * ITEMS_COUNT);
+        this.tuplesCount = 0;
+        this.shrinkThreshold = (long) (initialCapacity * shrinkFactor);
         this.shrinkFactor = shrinkFactor;
     }
 
@@ -87,7 +78,7 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      */
     @Override
     public void close() {
-        buffer.release();
+        array.close();
     }
 
     /**
@@ -98,13 +89,14 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      * @param n3
      */
     public void add(long n1, long n2, long n3) {
-        if (size == capacity) {
-            increaseCapacity();
+        long arrayIdx = tuplesCount * ITEMS_COUNT;
+        if ((arrayIdx + 2) >= array.getCapacity()) {
+            array.increaseCapacity();
         }
 
-        put(size, n1, n2, n3);
-        siftUp(size);
-        ++size;
+        put(tuplesCount, n1, n2, n3);
+        siftUp(tuplesCount);
+        ++tuplesCount;
     }
 
     /**
@@ -113,8 +105,8 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      * <p>The tuple will not be extracted
      */
     public long peekN1() {
-        checkArgument(size != 0);
-        return buffer.getLong(0);
+        checkArgument(tuplesCount != 0);
+        return array.readLong(0);
     }
 
     /**
@@ -123,8 +115,8 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      * <p>The tuple will not be extracted
      */
     public long peekN2() {
-        checkArgument(size != 0);
-        return buffer.getLong(0 + 1 * SIZE_OF_LONG);
+        checkArgument(tuplesCount != 0);
+        return array.readLong(1);
     }
 
     /**
@@ -133,17 +125,17 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      * <p>The tuple will not be extracted
      */
     public long peekN3() {
-        checkArgument(size != 0);
-        return buffer.getLong(0 + 2 * SIZE_OF_LONG);
+        checkArgument(tuplesCount != 0);
+        return array.readLong(2);
     }
 
     /**
      * Removes the first item from the queue.
      */
     public void pop() {
-        checkArgument(size != 0);
-        swap(0, size - 1);
-        size--;
+        checkArgument(tuplesCount != 0);
+        swap(0, tuplesCount - 1);
+        tuplesCount--;
         siftDown(0);
         shrinkCapacity();
     }
@@ -152,132 +144,125 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      * Returns whether the priority queue is empty.
      */
     public boolean isEmpty() {
-        return size == 0;
+        return tuplesCount == 0;
     }
 
     /**
      * Returns the number of tuples in the priority queue.
      */
-    public int size() {
-        return size;
+    public long size() {
+        return tuplesCount;
+    }
+
+    /**
+     * The amount of memory used to back the priority queue.
+     */
+    public long bytesCapacity() {
+        return array.bytesCapacity();
     }
 
     /**
      * Clear all items.
      */
     public void clear() {
-        this.buffer.clear();
-        this.size = 0;
+        this.tuplesCount = 0;
         shrinkCapacity();
     }
 
-    private void increaseCapacity() {
-        // For bigger sizes, increase by 50%
-        this.capacity += (capacity <= 256 ? capacity : capacity / 2);
-        this.shrinkThreshold = this.capacity * shrinkFactor;
-        buffer.capacity(this.capacity * TUPLE_SIZE);
-    }
-
     private void shrinkCapacity() {
-        if (capacity > initialCapacity &&  size < shrinkThreshold) {
-            int decreasingSize = (int) (capacity * shrinkFactor * RESERVATION_FACTOR);
-            if (decreasingSize <= 0) {
+        if (tuplesCount <= shrinkThreshold && array.getCapacity() > array.getInitialCapacity()) {
+            long sizeToShrink = (long) (array.getCapacity() * shrinkFactor * RESERVATION_FACTOR);
+            if (sizeToShrink == 0) {
                 return;
             }
-            if (capacity - decreasingSize <= initialCapacity) {
-                this.capacity = initialCapacity;
+
+            long newCapacity;
+            if (array.getCapacity() - sizeToShrink <= array.getInitialCapacity()) {
+                newCapacity = array.getInitialCapacity();
             } else {
-                this.capacity = capacity - decreasingSize;
+                newCapacity = array.getCapacity() - sizeToShrink;
             }
-            this.shrinkThreshold = this.capacity * shrinkFactor;
 
-            ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(this.capacity * TUPLE_SIZE);
-            buffer.getBytes(0, newBuffer, size * TUPLE_SIZE);
-            buffer.release();
-            this.buffer = newBuffer;
+            array.shrink(newCapacity);
+            this.shrinkThreshold = (long) (array.getCapacity() / (double) ITEMS_COUNT * shrinkFactor);
         }
     }
 
-    private void siftUp(int idx) {
-        while (idx > 0) {
-            int parentIdx = (idx - 1) / 2;
-            if (compare(idx, parentIdx) >= 0) {
+    private void siftUp(long tupleIdx) {
+        while (tupleIdx > 0) {
+            long parentIdx = (tupleIdx - 1) / 2;
+            if (compare(tupleIdx, parentIdx) >= 0) {
                 break;
             }
 
-            swap(idx, parentIdx);
-            idx = parentIdx;
+            swap(tupleIdx, parentIdx);
+            tupleIdx = parentIdx;
         }
     }
 
-    private void siftDown(int idx) {
-        int half = size / 2;
-        while (idx < half) {
-            int left = 2 * idx + 1;
-            int right = 2 * idx + 2;
+    private void siftDown(long tupleIdx) {
+        long half = tuplesCount / 2;
+        while (tupleIdx < half) {
+            long left = 2 * tupleIdx + 1;
+            long right = 2 * tupleIdx + 2;
 
-            int swapIdx = idx;
+            long swapIdx = tupleIdx;
 
-            if (compare(idx, left) > 0) {
+            if (compare(tupleIdx, left) > 0) {
                 swapIdx = left;
             }
 
-            if (right < size && compare(swapIdx, right) > 0) {
+            if (right < tuplesCount && compare(swapIdx, right) > 0) {
                 swapIdx = right;
             }
 
-            if (swapIdx == idx) {
+            if (swapIdx == tupleIdx) {
                 return;
             }
 
-            swap(idx, swapIdx);
-            idx = swapIdx;
+            swap(tupleIdx, swapIdx);
+            tupleIdx = swapIdx;
         }
     }
 
-    private void put(int idx, long n1, long n2, long n3) {
-        int i = idx * TUPLE_SIZE;
-        buffer.setLong(i, n1);
-        buffer.setLong(i + 1 * SIZE_OF_LONG, n2);
-        buffer.setLong(i + 2 * SIZE_OF_LONG, n3);
+    private void put(long tupleIdx, long n1, long n2, long n3) {
+        long idx = tupleIdx * ITEMS_COUNT;
+        array.writeLong(idx, n1);
+        array.writeLong(idx + 1, n2);
+        array.writeLong(idx + 2, n3);
     }
 
-    private int compare(int idx1, int idx2) {
-        int i1 = idx1 * TUPLE_SIZE;
-        int i2 = idx2 * TUPLE_SIZE;
+    private int compare(long tupleIdx1, long tupleIdx2) {
+        long idx1 = tupleIdx1 * ITEMS_COUNT;
+        long idx2 = tupleIdx2 * ITEMS_COUNT;
 
-        int c1 = Long.compare(buffer.getLong(i1), buffer.getLong(i2));
+        int c1 = Long.compare(array.readLong(idx1), array.readLong(idx2));
         if (c1 != 0) {
             return c1;
         }
 
-        int c2 = Long.compare(buffer.getLong(i1 + SIZE_OF_LONG), buffer.getLong(i2 + SIZE_OF_LONG));
+        int c2 = Long.compare(array.readLong(idx1 + 1), array.readLong(idx2 + 1));
         if (c2 != 0) {
             return c2;
         }
 
-        return Long.compare(buffer.getLong(i1 + 2 * SIZE_OF_LONG), buffer.getLong(i2 + 2 * SIZE_OF_LONG));
+        return Long.compare(array.readLong(idx1 + 2), array.readLong(idx2 + 2));
     }
 
-    private void swap(int idx1, int idx2) {
-        int i1 = idx1 * TUPLE_SIZE;
-        int i2 = idx2 * TUPLE_SIZE;
+    private void swap(long tupleIdx1, long tupleIdx2) {
+        long idx1 = tupleIdx1 * ITEMS_COUNT;
+        long idx2 = tupleIdx2 * ITEMS_COUNT;
 
-        long tmp1 = buffer.getLong(i1);
-        long tmp2 = buffer.getLong(i1 + 1 * SIZE_OF_LONG);
-        long tmp3 = buffer.getLong(i1 + 2 * SIZE_OF_LONG);
+        long tmp1 = array.readLong(idx1);
+        long tmp2 = array.readLong(idx1 + 1);
+        long tmp3 = array.readLong(idx1 + 2);
 
-        buffer.setLong(i1, buffer.getLong(i2));
-        buffer.setLong(i1 + 1 * SIZE_OF_LONG, buffer.getLong(i2 + 1 * SIZE_OF_LONG));
-        buffer.setLong(i1 + 2 * SIZE_OF_LONG, buffer.getLong(i2 + 2 * SIZE_OF_LONG));
-
-        buffer.setLong(i2, tmp1);
-        buffer.setLong(i2 + 1 * SIZE_OF_LONG, tmp2);
-        buffer.setLong(i2 + 2 * SIZE_OF_LONG, tmp3);
-    }
+        array.writeLong(idx1, array.readLong(idx2));
+        array.writeLong(idx1 + 1, array.readLong(idx2 + 1));
+        array.writeLong(idx1 + 2, array.readLong(idx2 + 2));
 
-    @VisibleForTesting
-    ByteBuf getBuffer() {
-        return buffer;
+        array.writeLong(idx2, tmp1);
+        array.writeLong(idx2 + 1, tmp2);
+        array.writeLong(idx2 + 2, tmp3);
     }
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java
new file mode 100644
index 00000000000..efb86fd4f9d
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import lombok.Cleanup;
+import org.testng.annotations.Test;
+
+public class SegmentedLongArrayTest {
+
+    @Test
+    public void testArray() {
+        @Cleanup
+        SegmentedLongArray a = new SegmentedLongArray(4);
+        assertEquals(a.getCapacity(), 4);
+        assertEquals(a.bytesCapacity(), 4 * 8);
+        assertEquals(a.getInitialCapacity(), 4);
+
+        a.writeLong(0, 0);
+        a.writeLong(1, 1);
+        a.writeLong(2, 2);
+        a.writeLong(3, Long.MAX_VALUE);
+
+        try {
+            a.writeLong(4, Long.MIN_VALUE);
+            fail("should have failed");
+        } catch (IndexOutOfBoundsException e) {
+            // Expected
+        }
+
+        a.increaseCapacity();
+
+        a.writeLong(4, Long.MIN_VALUE);
+
+        assertEquals(a.getCapacity(), 8);
+        assertEquals(a.bytesCapacity(), 8 * 8);
+        assertEquals(a.getInitialCapacity(), 4);
+
+        assertEquals(a.readLong(0), 0);
+        assertEquals(a.readLong(1), 1);
+        assertEquals(a.readLong(2), 2);
+        assertEquals(a.readLong(3), Long.MAX_VALUE);
+        assertEquals(a.readLong(4), Long.MIN_VALUE);
+
+        a.shrink(5);
+        assertEquals(a.getCapacity(), 5);
+        assertEquals(a.bytesCapacity(), 5 * 8);
+        assertEquals(a.getInitialCapacity(), 4);
+    }
+
+    @Test
+    public void testLargeArray() {
+        long initialCap = 3 * 1024 * 1024;
+
+        @Cleanup
+        SegmentedLongArray a = new SegmentedLongArray(initialCap);
+        assertEquals(a.getCapacity(), initialCap);
+        assertEquals(a.bytesCapacity(), initialCap * 8);
+        assertEquals(a.getInitialCapacity(), initialCap);
+
+        long baseOffset = initialCap - 100;
+
+        a.writeLong(baseOffset, 0);
+        a.writeLong(baseOffset + 1, 1);
+        a.writeLong(baseOffset + 2, 2);
+        a.writeLong(baseOffset + 3, Long.MAX_VALUE);
+        a.writeLong(baseOffset + 4, Long.MIN_VALUE);
+
+        a.increaseCapacity();
+
+        assertEquals(a.getCapacity(), 5 * 1024 * 1024);
+        assertEquals(a.bytesCapacity(), 5 * 1024 * 1024 * 8);
+        assertEquals(a.getInitialCapacity(), initialCap);
+
+        assertEquals(a.readLong(baseOffset), 0);
+        assertEquals(a.readLong(baseOffset + 1), 1);
+        assertEquals(a.readLong(baseOffset + 2), 2);
+        assertEquals(a.readLong(baseOffset + 3), Long.MAX_VALUE);
+        assertEquals(a.readLong(baseOffset + 4), Long.MIN_VALUE);
+
+        a.shrink(initialCap);
+        assertEquals(a.getCapacity(), initialCap);
+        assertEquals(a.bytesCapacity(), initialCap * 8);
+        assertEquals(a.getInitialCapacity(), initialCap);
+    }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
index bd3aef86ad1..de3b1adb792 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
@@ -54,6 +54,35 @@ public class TripleLongPriorityQueueTest {
         pq.close();
     }
 
+    @Test
+    public void testLargeQueue() {
+        TripleLongPriorityQueue pq = new TripleLongPriorityQueue();
+        assertEquals(pq.size(), 0);
+
+        final int N = 3_000_000;
+
+        for (int i = N; i > 0; i--) {
+            pq.add(i, i * 2L, i * 3L);
+        }
+
+        assertEquals(pq.size(), N);
+        assertFalse(pq.isEmpty());
+
+        for (int i = 1; i <= N; i++) {
+            assertEquals(pq.peekN1(), i);
+            assertEquals(pq.peekN2(), i * 2);
+            assertEquals(pq.peekN3(), i * 3);
+
+            pq.pop();
+
+            assertEquals(pq.size(), N - i);
+        }
+
+        pq.clear();
+        pq.close();
+    }
+
+
     @Test
     public void testCheckForEmpty() {
         TripleLongPriorityQueue pq = new TripleLongPriorityQueue();
@@ -143,26 +172,24 @@ public class TripleLongPriorityQueueTest {
         TripleLongPriorityQueue pq = new TripleLongPriorityQueue(initialCapacity, 0.5f);
         pq.add(0, 0, 0);
         assertEquals(pq.size(), 1);
-        assertEquals(pq.getBuffer().capacity(), initialCapacity * tupleSize);
+        assertEquals(pq.bytesCapacity(), initialCapacity * tupleSize);
 
         // Scale out to capacity * 2
         triggerScaleOut(initialCapacity, pq);
         int scaleCapacity = initialCapacity * 2;
-        assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
+        assertEquals(pq.bytesCapacity(), scaleCapacity * tupleSize);
         // Trigger shrinking
-        for (int i = 0; i < initialCapacity / 2 + 1; i++) {
+        for (int i = 0; i < initialCapacity / 2 + 2; i++) {
              pq.pop();
         }
-        int capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
-        assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
+        int capacity = scaleCapacity - (int)((scaleCapacity ) * 0.5f * 0.9f);
+        assertTrue(pq.bytesCapacity() < scaleCapacity * tupleSize);
         // Scale out to capacity * 2
         triggerScaleOut(initialCapacity, pq);
         scaleCapacity = capacity * 2;
-        assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
         // Trigger shrinking
         pq.clear();
         capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
-        assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
     }
 
     private void triggerScaleOut(int initialCapacity, TripleLongPriorityQueue pq) {