You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/07/12 23:52:23 UTC

[pulsar] branch branch-2.10 updated (5f8a8f7777b -> eca6c4adf55)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 5f8a8f7777b [fix][txn] Allow producer enable send timeout in transaction (#16519)
     new 88b10e04999 Support shrink for TripleLongPriorityQueue (#15936)
     new eca6c4adf55 [fix][broker] Fixed error when delayed messages trackers state grows to >1.5GB (#16490)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../PersistentDispatcherFailoverConsumerTest.java  |  26 ++-
 .../util/collections/SegmentedLongArray.java       | 128 ++++++++++++++
 .../util/collections/TripleLongPriorityQueue.java  | 186 ++++++++++++---------
 .../util/collections/SegmentedLongArrayTest.java   | 103 ++++++++++++
 .../collections/TripleLongPriorityQueueTest.java   |  62 +++++++
 5 files changed, 422 insertions(+), 83 deletions(-)
 create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java
 create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java


[pulsar] 01/02: Support shrink for TripleLongPriorityQueue (#15936)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 88b10e04999e364bd5ca9c7287d9c98dcba77053
Author: feynmanlin <31...@qq.com>
AuthorDate: Tue Jun 7 08:52:40 2022 +0800

    Support shrink for TripleLongPriorityQueue (#15936)
    
    * Support shrinkage in TripleLongPriorityQueue
    
    * Add unit test
    
    * Remove unused code
    
    * style
    
    * Address comments
---
 .../util/collections/TripleLongPriorityQueue.java  | 59 ++++++++++++++++++++--
 .../collections/TripleLongPriorityQueueTest.java   | 35 +++++++++++++
 2 files changed, 90 insertions(+), 4 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
index 1d8d909beae..487c2284cff 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.common.util.collections;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 
@@ -31,16 +32,30 @@ public class TripleLongPriorityQueue implements AutoCloseable {
 
     private static final int SIZE_OF_LONG = 8;
     private static final int DEFAULT_INITIAL_CAPACITY = 16;
+    private static final float DEFAULT_SHRINK_FACTOR = 0.5f;
 
     // Each item is composed of 3 longs
     private static final int ITEMS_COUNT = 3;
 
     private static final int TUPLE_SIZE = ITEMS_COUNT * SIZE_OF_LONG;
 
-    private final ByteBuf buffer;
+    /**
+     * Reserve 10% of the capacity when shrinking to avoid frequent expansion and shrinkage.
+     */
+    private static final float RESERVATION_FACTOR = 0.9f;
+
+    private ByteBuf buffer;
+
+    private final int initialCapacity;
 
     private int capacity;
     private int size;
+    /**
+     * When size < capacity * shrinkFactor, may trigger shrinking.
+     */
+    private final float shrinkFactor;
+
+    private float shrinkThreshold;
 
     /**
      * Create a new priority queue with default initial capacity.
@@ -49,14 +64,22 @@ public class TripleLongPriorityQueue implements AutoCloseable {
         this(DEFAULT_INITIAL_CAPACITY);
     }
 
+    public TripleLongPriorityQueue(int initialCapacity, float shrinkFactor) {
+        checkArgument(shrinkFactor > 0);
+        this.initialCapacity = initialCapacity;
+        this.capacity = initialCapacity;
+        this.shrinkThreshold = this.capacity * shrinkFactor;
+        this.buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * TUPLE_SIZE);
+        this.size = 0;
+        this.shrinkFactor = shrinkFactor;
+    }
+
     /**
      * Create a new priority queue with a given initial capacity.
      * @param initialCapacity
      */
     public TripleLongPriorityQueue(int initialCapacity) {
-        capacity = initialCapacity;
-        buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * ITEMS_COUNT * SIZE_OF_LONG);
-        size = 0;
+        this(initialCapacity, DEFAULT_SHRINK_FACTOR);
     }
 
     /**
@@ -122,6 +145,7 @@ public class TripleLongPriorityQueue implements AutoCloseable {
         swap(0, size - 1);
         size--;
         siftDown(0);
+        shrinkCapacity();
     }
 
     /**
@@ -144,14 +168,36 @@ public class TripleLongPriorityQueue implements AutoCloseable {
     public void clear() {
         this.buffer.clear();
         this.size = 0;
+        shrinkCapacity();
     }
 
     private void increaseCapacity() {
         // For bigger sizes, increase by 50%
         this.capacity += (capacity <= 256 ? capacity : capacity / 2);
+        this.shrinkThreshold = this.capacity * shrinkFactor;
         buffer.capacity(this.capacity * TUPLE_SIZE);
     }
 
+    private void shrinkCapacity() {
+        if (capacity > initialCapacity &&  size < shrinkThreshold) {
+            int decreasingSize = (int) (capacity * shrinkFactor * RESERVATION_FACTOR);
+            if (decreasingSize <= 0) {
+                return;
+            }
+            if (capacity - decreasingSize <= initialCapacity) {
+                this.capacity = initialCapacity;
+            } else {
+                this.capacity = capacity - decreasingSize;
+            }
+            this.shrinkThreshold = this.capacity * shrinkFactor;
+
+            ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(this.capacity * TUPLE_SIZE);
+            buffer.getBytes(0, newBuffer, size * TUPLE_SIZE);
+            buffer.release();
+            this.buffer = newBuffer;
+        }
+    }
+
     private void siftUp(int idx) {
         while (idx > 0) {
             int parentIdx = (idx - 1) / 2;
@@ -229,4 +275,9 @@ public class TripleLongPriorityQueue implements AutoCloseable {
         buffer.setLong(i2 + 1 * SIZE_OF_LONG, tmp2);
         buffer.setLong(i2 + 2 * SIZE_OF_LONG, tmp3);
     }
+
+    @VisibleForTesting
+    ByteBuf getBuffer() {
+        return buffer;
+    }
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
index 4cb1027e0a9..bd3aef86ad1 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
@@ -135,4 +135,39 @@ public class TripleLongPriorityQueueTest {
 
         pq.close();
     }
+
+    @Test
+    public void testShrink() throws Exception {
+        int initialCapacity = 20;
+        int tupleSize = 3 * 8;
+        TripleLongPriorityQueue pq = new TripleLongPriorityQueue(initialCapacity, 0.5f);
+        pq.add(0, 0, 0);
+        assertEquals(pq.size(), 1);
+        assertEquals(pq.getBuffer().capacity(), initialCapacity * tupleSize);
+
+        // Scale out to capacity * 2
+        triggerScaleOut(initialCapacity, pq);
+        int scaleCapacity = initialCapacity * 2;
+        assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
+        // Trigger shrinking
+        for (int i = 0; i < initialCapacity / 2 + 1; i++) {
+             pq.pop();
+        }
+        int capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
+        assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
+        // Scale out to capacity * 2
+        triggerScaleOut(initialCapacity, pq);
+        scaleCapacity = capacity * 2;
+        assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
+        // Trigger shrinking
+        pq.clear();
+        capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
+        assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
+    }
+
+    private void triggerScaleOut(int initialCapacity, TripleLongPriorityQueue pq) {
+        for (long i = 0; i < initialCapacity + 1; i++) {
+            pq.add(i, i, i);
+        }
+    }
 }


[pulsar] 02/02: [fix][broker] Fixed error when delayed messages trackers state grows to >1.5GB (#16490)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eca6c4adf55974e8ecca94863cc41f81ab8aff5e
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jul 12 16:12:57 2022 -0700

    [fix][broker] Fixed error when delayed messages trackers state grows to >1.5GB (#16490)
    
    * Fixed error when delayed messages trackers state grows to >1.5GB
    
    * Fixed spotbugs issues
    
    * Fixed javadocs
    
    * In the constructor, ensure all segments after the first one are of max size
    
    * Use poll to figure out where the test is stuck
    
    * Added SegmentedLongArray specific unit test
    
    * Removed unused imports
---
 .../PersistentDispatcherFailoverConsumerTest.java  |  26 ++-
 .../util/collections/SegmentedLongArray.java       | 128 ++++++++++++++
 .../util/collections/TripleLongPriorityQueue.java  | 187 ++++++++++-----------
 .../util/collections/SegmentedLongArrayTest.java   | 103 ++++++++++++
 .../collections/TripleLongPriorityQueueTest.java   |  41 ++++-
 5 files changed, 369 insertions(+), 116 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 7e697e6c39a..177aaefeb9d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertSame;
@@ -46,6 +47,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.Supplier;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -352,7 +354,8 @@ public class PersistentDispatcherFailoverConsumerTest {
         // 4. Verify active consumer
         assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
         // get the notified with who is the leader
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 1, true);
         verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
 
@@ -364,7 +367,8 @@ public class PersistentDispatcherFailoverConsumerTest {
         assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName());
         assertEquals(3, consumers.size());
         // get notified with who is the leader
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 2, false);
         verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
         verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1));
@@ -379,13 +383,17 @@ public class PersistentDispatcherFailoverConsumerTest {
         assertEquals(4, consumers.size());
 
         // all consumers will receive notifications
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 0, true);
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 1, false);
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 1, false);
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 2, false);
         verify(consumer0, times(1)).notifyActiveConsumerChange(same(consumer0));
         verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1));
@@ -411,9 +419,11 @@ public class PersistentDispatcherFailoverConsumerTest {
         assertEquals(2, consumers.size());
 
         // the remaining consumers will receive notifications
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 1, true);
-        change = consumerChanges.take();
+        change = consumerChanges.poll(10, TimeUnit.SECONDS);
+        assertNotNull(change);
         verifyActiveConsumerChange(change, 1, true);
 
         // 10. Attempt to remove already removed consumer
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java
new file mode 100644
index 00000000000..dc4d1c4908c
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/SegmentedLongArray.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+
+@NotThreadSafe
+public class SegmentedLongArray implements AutoCloseable {
+
+    private static final int SIZE_OF_LONG = 8;
+
+    private static final int MAX_SEGMENT_SIZE = 2 * 1024 * 1024; // 2M longs -> 16 MB
+    private final List<ByteBuf> buffers = new ArrayList<>();
+
+    @Getter
+    private final long initialCapacity;
+
+    @Getter
+    private long capacity;
+
+    public SegmentedLongArray(long initialCapacity) {
+        long remainingToAdd = initialCapacity;
+
+        // Add first segment
+        int sizeToAdd = (int) Math.min(remainingToAdd, MAX_SEGMENT_SIZE);
+        ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(sizeToAdd * SIZE_OF_LONG);
+        buffer.writerIndex(sizeToAdd * SIZE_OF_LONG);
+        buffers.add(buffer);
+        remainingToAdd -= sizeToAdd;
+
+        // Add the remaining segments, all at full segment size, if necessary
+        while (remainingToAdd > 0) {
+            buffer = PooledByteBufAllocator.DEFAULT.directBuffer(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
+            buffer.writerIndex(MAX_SEGMENT_SIZE * SIZE_OF_LONG);
+            buffers.add(buffer);
+            remainingToAdd -= MAX_SEGMENT_SIZE;
+        }
+
+        this.initialCapacity = initialCapacity;
+        this.capacity = this.initialCapacity;
+    }
+
+    public void writeLong(long offset, long value) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        buffers.get(bufferIdx).setLong(internalIdx * SIZE_OF_LONG, value);
+    }
+
+    public long readLong(long offset) {
+        int bufferIdx = (int) (offset / MAX_SEGMENT_SIZE);
+        int internalIdx = (int) (offset % MAX_SEGMENT_SIZE);
+        return buffers.get(bufferIdx).getLong(internalIdx * SIZE_OF_LONG);
+    }
+
+    public void increaseCapacity() {
+        if (capacity < MAX_SEGMENT_SIZE) {
+            // Resize the current buffer to bigger capacity
+            capacity += (capacity <= 256 ? capacity : capacity / 2);
+            capacity = Math.min(capacity, MAX_SEGMENT_SIZE);
+            buffers.get(0).capacity((int) this.capacity * SIZE_OF_LONG);
+            buffers.get(0).writerIndex((int) this.capacity * SIZE_OF_LONG);
+        } else {
+            // Let's add 1 mode buffer to the list
+            int bufferSize = MAX_SEGMENT_SIZE * SIZE_OF_LONG;
+            ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize, bufferSize);
+            buffer.writerIndex(bufferSize);
+            buffers.add(buffer);
+            capacity += MAX_SEGMENT_SIZE;
+        }
+    }
+
+    public void shrink(long newCapacity) {
+        if (newCapacity >= capacity || newCapacity < initialCapacity) {
+            return;
+        }
+
+        long sizeToReduce = capacity - newCapacity;
+        while (sizeToReduce >= MAX_SEGMENT_SIZE && buffers.size() > 1) {
+            ByteBuf b = buffers.remove(buffers.size() - 1);
+            b.release();
+            capacity -= MAX_SEGMENT_SIZE;
+            sizeToReduce -= MAX_SEGMENT_SIZE;
+        }
+
+        if (buffers.size() == 1 && sizeToReduce > 0) {
+            // We should also reduce the capacity of the first buffer
+            capacity -= sizeToReduce;
+            ByteBuf oldBuffer = buffers.get(0);
+            ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer((int) capacity * SIZE_OF_LONG);
+            oldBuffer.getBytes(0, newBuffer, (int) capacity * SIZE_OF_LONG);
+            oldBuffer.release();
+            buffers.set(0, newBuffer);
+        }
+    }
+
+    @Override
+    public void close() {
+        buffers.forEach(ByteBuf::release);
+    }
+
+    /**
+     * The amount of memory used to back the array of longs.
+     */
+    public long bytesCapacity() {
+        return capacity * SIZE_OF_LONG;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
index 487c2284cff..50288247c64 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
@@ -19,9 +19,6 @@
 package org.apache.pulsar.common.util.collections;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.annotations.VisibleForTesting;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 
 /**
  * Provides a priority-queue implementation specialized on items composed by 3 longs.
@@ -29,33 +26,28 @@ import io.netty.buffer.PooledByteBufAllocator;
  * <p>This class is not thread safe and the items are stored in direct memory.
  */
 public class TripleLongPriorityQueue implements AutoCloseable {
-
-    private static final int SIZE_OF_LONG = 8;
     private static final int DEFAULT_INITIAL_CAPACITY = 16;
     private static final float DEFAULT_SHRINK_FACTOR = 0.5f;
 
     // Each item is composed of 3 longs
     private static final int ITEMS_COUNT = 3;
 
-    private static final int TUPLE_SIZE = ITEMS_COUNT * SIZE_OF_LONG;
-
     /**
      * Reserve 10% of the capacity when shrinking to avoid frequent expansion and shrinkage.
      */
     private static final float RESERVATION_FACTOR = 0.9f;
 
-    private ByteBuf buffer;
+    private final SegmentedLongArray array;
 
-    private final int initialCapacity;
+    // Count of how many (long,long,long) tuples are currently inserted
+    private long tuplesCount;
 
-    private int capacity;
-    private int size;
     /**
      * When size < capacity * shrinkFactor, may trigger shrinking.
      */
     private final float shrinkFactor;
 
-    private float shrinkThreshold;
+    private long shrinkThreshold;
 
     /**
      * Create a new priority queue with default initial capacity.
@@ -64,13 +56,12 @@ public class TripleLongPriorityQueue implements AutoCloseable {
         this(DEFAULT_INITIAL_CAPACITY);
     }
 
-    public TripleLongPriorityQueue(int initialCapacity, float shrinkFactor) {
+    public TripleLongPriorityQueue(long initialCapacity, float shrinkFactor) {
+        checkArgument(initialCapacity > 0);
         checkArgument(shrinkFactor > 0);
-        this.initialCapacity = initialCapacity;
-        this.capacity = initialCapacity;
-        this.shrinkThreshold = this.capacity * shrinkFactor;
-        this.buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * TUPLE_SIZE);
-        this.size = 0;
+        this.array = new SegmentedLongArray(initialCapacity * ITEMS_COUNT);
+        this.tuplesCount = 0;
+        this.shrinkThreshold = (long) (initialCapacity * shrinkFactor);
         this.shrinkFactor = shrinkFactor;
     }
 
@@ -87,7 +78,7 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      */
     @Override
     public void close() {
-        buffer.release();
+        array.close();
     }
 
     /**
@@ -98,13 +89,14 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      * @param n3
      */
     public void add(long n1, long n2, long n3) {
-        if (size == capacity) {
-            increaseCapacity();
+        long arrayIdx = tuplesCount * ITEMS_COUNT;
+        if ((arrayIdx + 2) >= array.getCapacity()) {
+            array.increaseCapacity();
         }
 
-        put(size, n1, n2, n3);
-        siftUp(size);
-        ++size;
+        put(tuplesCount, n1, n2, n3);
+        siftUp(tuplesCount);
+        ++tuplesCount;
     }
 
     /**
@@ -113,8 +105,8 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      * <p>The tuple will not be extracted
      */
     public long peekN1() {
-        checkArgument(size != 0);
-        return buffer.getLong(0);
+        checkArgument(tuplesCount != 0);
+        return array.readLong(0);
     }
 
     /**
@@ -123,8 +115,8 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      * <p>The tuple will not be extracted
      */
     public long peekN2() {
-        checkArgument(size != 0);
-        return buffer.getLong(0 + 1 * SIZE_OF_LONG);
+        checkArgument(tuplesCount != 0);
+        return array.readLong(1);
     }
 
     /**
@@ -133,17 +125,17 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      * <p>The tuple will not be extracted
      */
     public long peekN3() {
-        checkArgument(size != 0);
-        return buffer.getLong(0 + 2 * SIZE_OF_LONG);
+        checkArgument(tuplesCount != 0);
+        return array.readLong(2);
     }
 
     /**
      * Removes the first item from the queue.
      */
     public void pop() {
-        checkArgument(size != 0);
-        swap(0, size - 1);
-        size--;
+        checkArgument(tuplesCount != 0);
+        swap(0, tuplesCount - 1);
+        tuplesCount--;
         siftDown(0);
         shrinkCapacity();
     }
@@ -152,132 +144,125 @@ public class TripleLongPriorityQueue implements AutoCloseable {
      * Returns whether the priority queue is empty.
      */
     public boolean isEmpty() {
-        return size == 0;
+        return tuplesCount == 0;
     }
 
     /**
      * Returns the number of tuples in the priority queue.
      */
-    public int size() {
-        return size;
+    public long size() {
+        return tuplesCount;
+    }
+
+    /**
+     * The amount of memory used to back the priority queue.
+     */
+    public long bytesCapacity() {
+        return array.bytesCapacity();
     }
 
     /**
      * Clear all items.
      */
     public void clear() {
-        this.buffer.clear();
-        this.size = 0;
+        this.tuplesCount = 0;
         shrinkCapacity();
     }
 
-    private void increaseCapacity() {
-        // For bigger sizes, increase by 50%
-        this.capacity += (capacity <= 256 ? capacity : capacity / 2);
-        this.shrinkThreshold = this.capacity * shrinkFactor;
-        buffer.capacity(this.capacity * TUPLE_SIZE);
-    }
-
     private void shrinkCapacity() {
-        if (capacity > initialCapacity &&  size < shrinkThreshold) {
-            int decreasingSize = (int) (capacity * shrinkFactor * RESERVATION_FACTOR);
-            if (decreasingSize <= 0) {
+        if (tuplesCount <= shrinkThreshold && array.getCapacity() > array.getInitialCapacity()) {
+            long sizeToShrink = (long) (array.getCapacity() * shrinkFactor * RESERVATION_FACTOR);
+            if (sizeToShrink == 0) {
                 return;
             }
-            if (capacity - decreasingSize <= initialCapacity) {
-                this.capacity = initialCapacity;
+
+            long newCapacity;
+            if (array.getCapacity() - sizeToShrink <= array.getInitialCapacity()) {
+                newCapacity = array.getInitialCapacity();
             } else {
-                this.capacity = capacity - decreasingSize;
+                newCapacity = array.getCapacity() - sizeToShrink;
             }
-            this.shrinkThreshold = this.capacity * shrinkFactor;
 
-            ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(this.capacity * TUPLE_SIZE);
-            buffer.getBytes(0, newBuffer, size * TUPLE_SIZE);
-            buffer.release();
-            this.buffer = newBuffer;
+            array.shrink(newCapacity);
+            this.shrinkThreshold = (long) (array.getCapacity() / (double) ITEMS_COUNT * shrinkFactor);
         }
     }
 
-    private void siftUp(int idx) {
-        while (idx > 0) {
-            int parentIdx = (idx - 1) / 2;
-            if (compare(idx, parentIdx) >= 0) {
+    private void siftUp(long tupleIdx) {
+        while (tupleIdx > 0) {
+            long parentIdx = (tupleIdx - 1) / 2;
+            if (compare(tupleIdx, parentIdx) >= 0) {
                 break;
             }
 
-            swap(idx, parentIdx);
-            idx = parentIdx;
+            swap(tupleIdx, parentIdx);
+            tupleIdx = parentIdx;
         }
     }
 
-    private void siftDown(int idx) {
-        int half = size / 2;
-        while (idx < half) {
-            int left = 2 * idx + 1;
-            int right = 2 * idx + 2;
+    private void siftDown(long tupleIdx) {
+        long half = tuplesCount / 2;
+        while (tupleIdx < half) {
+            long left = 2 * tupleIdx + 1;
+            long right = 2 * tupleIdx + 2;
 
-            int swapIdx = idx;
+            long swapIdx = tupleIdx;
 
-            if (compare(idx, left) > 0) {
+            if (compare(tupleIdx, left) > 0) {
                 swapIdx = left;
             }
 
-            if (right < size && compare(swapIdx, right) > 0) {
+            if (right < tuplesCount && compare(swapIdx, right) > 0) {
                 swapIdx = right;
             }
 
-            if (swapIdx == idx) {
+            if (swapIdx == tupleIdx) {
                 return;
             }
 
-            swap(idx, swapIdx);
-            idx = swapIdx;
+            swap(tupleIdx, swapIdx);
+            tupleIdx = swapIdx;
         }
     }
 
-    private void put(int idx, long n1, long n2, long n3) {
-        int i = idx * TUPLE_SIZE;
-        buffer.setLong(i, n1);
-        buffer.setLong(i + 1 * SIZE_OF_LONG, n2);
-        buffer.setLong(i + 2 * SIZE_OF_LONG, n3);
+    private void put(long tupleIdx, long n1, long n2, long n3) {
+        long idx = tupleIdx * ITEMS_COUNT;
+        array.writeLong(idx, n1);
+        array.writeLong(idx + 1, n2);
+        array.writeLong(idx + 2, n3);
     }
 
-    private int compare(int idx1, int idx2) {
-        int i1 = idx1 * TUPLE_SIZE;
-        int i2 = idx2 * TUPLE_SIZE;
+    private int compare(long tupleIdx1, long tupleIdx2) {
+        long idx1 = tupleIdx1 * ITEMS_COUNT;
+        long idx2 = tupleIdx2 * ITEMS_COUNT;
 
-        int c1 = Long.compare(buffer.getLong(i1), buffer.getLong(i2));
+        int c1 = Long.compare(array.readLong(idx1), array.readLong(idx2));
         if (c1 != 0) {
             return c1;
         }
 
-        int c2 = Long.compare(buffer.getLong(i1 + SIZE_OF_LONG), buffer.getLong(i2 + SIZE_OF_LONG));
+        int c2 = Long.compare(array.readLong(idx1 + 1), array.readLong(idx2 + 1));
         if (c2 != 0) {
             return c2;
         }
 
-        return Long.compare(buffer.getLong(i1 + 2 * SIZE_OF_LONG), buffer.getLong(i2 + 2 * SIZE_OF_LONG));
+        return Long.compare(array.readLong(idx1 + 2), array.readLong(idx2 + 2));
     }
 
-    private void swap(int idx1, int idx2) {
-        int i1 = idx1 * TUPLE_SIZE;
-        int i2 = idx2 * TUPLE_SIZE;
+    private void swap(long tupleIdx1, long tupleIdx2) {
+        long idx1 = tupleIdx1 * ITEMS_COUNT;
+        long idx2 = tupleIdx2 * ITEMS_COUNT;
 
-        long tmp1 = buffer.getLong(i1);
-        long tmp2 = buffer.getLong(i1 + 1 * SIZE_OF_LONG);
-        long tmp3 = buffer.getLong(i1 + 2 * SIZE_OF_LONG);
+        long tmp1 = array.readLong(idx1);
+        long tmp2 = array.readLong(idx1 + 1);
+        long tmp3 = array.readLong(idx1 + 2);
 
-        buffer.setLong(i1, buffer.getLong(i2));
-        buffer.setLong(i1 + 1 * SIZE_OF_LONG, buffer.getLong(i2 + 1 * SIZE_OF_LONG));
-        buffer.setLong(i1 + 2 * SIZE_OF_LONG, buffer.getLong(i2 + 2 * SIZE_OF_LONG));
-
-        buffer.setLong(i2, tmp1);
-        buffer.setLong(i2 + 1 * SIZE_OF_LONG, tmp2);
-        buffer.setLong(i2 + 2 * SIZE_OF_LONG, tmp3);
-    }
+        array.writeLong(idx1, array.readLong(idx2));
+        array.writeLong(idx1 + 1, array.readLong(idx2 + 1));
+        array.writeLong(idx1 + 2, array.readLong(idx2 + 2));
 
-    @VisibleForTesting
-    ByteBuf getBuffer() {
-        return buffer;
+        array.writeLong(idx2, tmp1);
+        array.writeLong(idx2 + 1, tmp2);
+        array.writeLong(idx2 + 2, tmp3);
     }
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java
new file mode 100644
index 00000000000..efb86fd4f9d
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/SegmentedLongArrayTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import lombok.Cleanup;
+import org.testng.annotations.Test;
+
+public class SegmentedLongArrayTest {
+
+    @Test
+    public void testArray() {
+        @Cleanup
+        SegmentedLongArray a = new SegmentedLongArray(4);
+        assertEquals(a.getCapacity(), 4);
+        assertEquals(a.bytesCapacity(), 4 * 8);
+        assertEquals(a.getInitialCapacity(), 4);
+
+        a.writeLong(0, 0);
+        a.writeLong(1, 1);
+        a.writeLong(2, 2);
+        a.writeLong(3, Long.MAX_VALUE);
+
+        try {
+            a.writeLong(4, Long.MIN_VALUE);
+            fail("should have failed");
+        } catch (IndexOutOfBoundsException e) {
+            // Expected
+        }
+
+        a.increaseCapacity();
+
+        a.writeLong(4, Long.MIN_VALUE);
+
+        assertEquals(a.getCapacity(), 8);
+        assertEquals(a.bytesCapacity(), 8 * 8);
+        assertEquals(a.getInitialCapacity(), 4);
+
+        assertEquals(a.readLong(0), 0);
+        assertEquals(a.readLong(1), 1);
+        assertEquals(a.readLong(2), 2);
+        assertEquals(a.readLong(3), Long.MAX_VALUE);
+        assertEquals(a.readLong(4), Long.MIN_VALUE);
+
+        a.shrink(5);
+        assertEquals(a.getCapacity(), 5);
+        assertEquals(a.bytesCapacity(), 5 * 8);
+        assertEquals(a.getInitialCapacity(), 4);
+    }
+
+    @Test
+    public void testLargeArray() {
+        long initialCap = 3 * 1024 * 1024;
+
+        @Cleanup
+        SegmentedLongArray a = new SegmentedLongArray(initialCap);
+        assertEquals(a.getCapacity(), initialCap);
+        assertEquals(a.bytesCapacity(), initialCap * 8);
+        assertEquals(a.getInitialCapacity(), initialCap);
+
+        long baseOffset = initialCap - 100;
+
+        a.writeLong(baseOffset, 0);
+        a.writeLong(baseOffset + 1, 1);
+        a.writeLong(baseOffset + 2, 2);
+        a.writeLong(baseOffset + 3, Long.MAX_VALUE);
+        a.writeLong(baseOffset + 4, Long.MIN_VALUE);
+
+        a.increaseCapacity();
+
+        assertEquals(a.getCapacity(), 5 * 1024 * 1024);
+        assertEquals(a.bytesCapacity(), 5 * 1024 * 1024 * 8);
+        assertEquals(a.getInitialCapacity(), initialCap);
+
+        assertEquals(a.readLong(baseOffset), 0);
+        assertEquals(a.readLong(baseOffset + 1), 1);
+        assertEquals(a.readLong(baseOffset + 2), 2);
+        assertEquals(a.readLong(baseOffset + 3), Long.MAX_VALUE);
+        assertEquals(a.readLong(baseOffset + 4), Long.MIN_VALUE);
+
+        a.shrink(initialCap);
+        assertEquals(a.getCapacity(), initialCap);
+        assertEquals(a.bytesCapacity(), initialCap * 8);
+        assertEquals(a.getInitialCapacity(), initialCap);
+    }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
index bd3aef86ad1..de3b1adb792 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
@@ -54,6 +54,35 @@ public class TripleLongPriorityQueueTest {
         pq.close();
     }
 
+    @Test
+    public void testLargeQueue() {
+        TripleLongPriorityQueue pq = new TripleLongPriorityQueue();
+        assertEquals(pq.size(), 0);
+
+        final int N = 3_000_000;
+
+        for (int i = N; i > 0; i--) {
+            pq.add(i, i * 2L, i * 3L);
+        }
+
+        assertEquals(pq.size(), N);
+        assertFalse(pq.isEmpty());
+
+        for (int i = 1; i <= N; i++) {
+            assertEquals(pq.peekN1(), i);
+            assertEquals(pq.peekN2(), i * 2);
+            assertEquals(pq.peekN3(), i * 3);
+
+            pq.pop();
+
+            assertEquals(pq.size(), N - i);
+        }
+
+        pq.clear();
+        pq.close();
+    }
+
+
     @Test
     public void testCheckForEmpty() {
         TripleLongPriorityQueue pq = new TripleLongPriorityQueue();
@@ -143,26 +172,24 @@ public class TripleLongPriorityQueueTest {
         TripleLongPriorityQueue pq = new TripleLongPriorityQueue(initialCapacity, 0.5f);
         pq.add(0, 0, 0);
         assertEquals(pq.size(), 1);
-        assertEquals(pq.getBuffer().capacity(), initialCapacity * tupleSize);
+        assertEquals(pq.bytesCapacity(), initialCapacity * tupleSize);
 
         // Scale out to capacity * 2
         triggerScaleOut(initialCapacity, pq);
         int scaleCapacity = initialCapacity * 2;
-        assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
+        assertEquals(pq.bytesCapacity(), scaleCapacity * tupleSize);
         // Trigger shrinking
-        for (int i = 0; i < initialCapacity / 2 + 1; i++) {
+        for (int i = 0; i < initialCapacity / 2 + 2; i++) {
              pq.pop();
         }
-        int capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
-        assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
+        int capacity = scaleCapacity - (int)((scaleCapacity ) * 0.5f * 0.9f);
+        assertTrue(pq.bytesCapacity() < scaleCapacity * tupleSize);
         // Scale out to capacity * 2
         triggerScaleOut(initialCapacity, pq);
         scaleCapacity = capacity * 2;
-        assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
         // Trigger shrinking
         pq.clear();
         capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
-        assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
     }
 
     private void triggerScaleOut(int initialCapacity, TripleLongPriorityQueue pq) {