You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/07 00:52:47 UTC

[pulsar] branch master updated: Support shrink for TripleLongPriorityQueue (#15936)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 522afcfc44f Support shrink for TripleLongPriorityQueue (#15936)
522afcfc44f is described below

commit 522afcfc44f5be92811fec22cf00d357bedb8ad6
Author: feynmanlin <31...@qq.com>
AuthorDate: Tue Jun 7 08:52:40 2022 +0800

    Support shrink for TripleLongPriorityQueue (#15936)
    
    * Support shrinkage in TripleLongPriorityQueue
    
    * Add unit test
    
    * Remove unused code
    
    * style
    
    * Address comments
---
 .../util/collections/TripleLongPriorityQueue.java  | 59 ++++++++++++++++++++--
 .../collections/TripleLongPriorityQueueTest.java   | 35 +++++++++++++
 2 files changed, 90 insertions(+), 4 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
index 1d8d909beae..487c2284cff 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.common.util.collections;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 
@@ -31,16 +32,30 @@ public class TripleLongPriorityQueue implements AutoCloseable {
 
     private static final int SIZE_OF_LONG = 8;
     private static final int DEFAULT_INITIAL_CAPACITY = 16;
+    private static final float DEFAULT_SHRINK_FACTOR = 0.5f;
 
     // Each item is composed of 3 longs
     private static final int ITEMS_COUNT = 3;
 
     private static final int TUPLE_SIZE = ITEMS_COUNT * SIZE_OF_LONG;
 
-    private final ByteBuf buffer;
+    /**
+     * Reserve 10% of the capacity when shrinking to avoid frequent expansion and shrinkage.
+     */
+    private static final float RESERVATION_FACTOR = 0.9f;
+
+    private ByteBuf buffer;
+
+    private final int initialCapacity;
 
     private int capacity;
     private int size;
+    /**
+     * When size < capacity * shrinkFactor, may trigger shrinking.
+     */
+    private final float shrinkFactor;
+
+    private float shrinkThreshold;
 
     /**
      * Create a new priority queue with default initial capacity.
@@ -49,14 +64,22 @@ public class TripleLongPriorityQueue implements AutoCloseable {
         this(DEFAULT_INITIAL_CAPACITY);
     }
 
+    public TripleLongPriorityQueue(int initialCapacity, float shrinkFactor) {
+        checkArgument(shrinkFactor > 0);
+        this.initialCapacity = initialCapacity;
+        this.capacity = initialCapacity;
+        this.shrinkThreshold = this.capacity * shrinkFactor;
+        this.buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * TUPLE_SIZE);
+        this.size = 0;
+        this.shrinkFactor = shrinkFactor;
+    }
+
     /**
      * Create a new priority queue with a given initial capacity.
      * @param initialCapacity
      */
     public TripleLongPriorityQueue(int initialCapacity) {
-        capacity = initialCapacity;
-        buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * ITEMS_COUNT * SIZE_OF_LONG);
-        size = 0;
+        this(initialCapacity, DEFAULT_SHRINK_FACTOR);
     }
 
     /**
@@ -122,6 +145,7 @@ public class TripleLongPriorityQueue implements AutoCloseable {
         swap(0, size - 1);
         size--;
         siftDown(0);
+        shrinkCapacity();
     }
 
     /**
@@ -144,14 +168,36 @@ public class TripleLongPriorityQueue implements AutoCloseable {
     public void clear() {
         this.buffer.clear();
         this.size = 0;
+        shrinkCapacity();
     }
 
     private void increaseCapacity() {
         // For bigger sizes, increase by 50%
         this.capacity += (capacity <= 256 ? capacity : capacity / 2);
+        this.shrinkThreshold = this.capacity * shrinkFactor;
         buffer.capacity(this.capacity * TUPLE_SIZE);
     }
 
+    private void shrinkCapacity() {
+        if (capacity > initialCapacity &&  size < shrinkThreshold) {
+            int decreasingSize = (int) (capacity * shrinkFactor * RESERVATION_FACTOR);
+            if (decreasingSize <= 0) {
+                return;
+            }
+            if (capacity - decreasingSize <= initialCapacity) {
+                this.capacity = initialCapacity;
+            } else {
+                this.capacity = capacity - decreasingSize;
+            }
+            this.shrinkThreshold = this.capacity * shrinkFactor;
+
+            ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(this.capacity * TUPLE_SIZE);
+            buffer.getBytes(0, newBuffer, size * TUPLE_SIZE);
+            buffer.release();
+            this.buffer = newBuffer;
+        }
+    }
+
     private void siftUp(int idx) {
         while (idx > 0) {
             int parentIdx = (idx - 1) / 2;
@@ -229,4 +275,9 @@ public class TripleLongPriorityQueue implements AutoCloseable {
         buffer.setLong(i2 + 1 * SIZE_OF_LONG, tmp2);
         buffer.setLong(i2 + 2 * SIZE_OF_LONG, tmp3);
     }
+
+    @VisibleForTesting
+    ByteBuf getBuffer() {
+        return buffer;
+    }
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
index 4cb1027e0a9..bd3aef86ad1 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
@@ -135,4 +135,39 @@ public class TripleLongPriorityQueueTest {
 
         pq.close();
     }
+
+    @Test
+    public void testShrink() throws Exception {
+        int initialCapacity = 20;
+        int tupleSize = 3 * 8;
+        TripleLongPriorityQueue pq = new TripleLongPriorityQueue(initialCapacity, 0.5f);
+        pq.add(0, 0, 0);
+        assertEquals(pq.size(), 1);
+        assertEquals(pq.getBuffer().capacity(), initialCapacity * tupleSize);
+
+        // Scale out to capacity * 2
+        triggerScaleOut(initialCapacity, pq);
+        int scaleCapacity = initialCapacity * 2;
+        assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
+        // Trigger shrinking
+        for (int i = 0; i < initialCapacity / 2 + 1; i++) {
+             pq.pop();
+        }
+        int capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
+        assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
+        // Scale out to capacity * 2
+        triggerScaleOut(initialCapacity, pq);
+        scaleCapacity = capacity * 2;
+        assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
+        // Trigger shrinking
+        pq.clear();
+        capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
+        assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
+    }
+
+    private void triggerScaleOut(int initialCapacity, TripleLongPriorityQueue pq) {
+        for (long i = 0; i < initialCapacity + 1; i++) {
+            pq.add(i, i, i);
+        }
+    }
 }