You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/07 00:52:47 UTC
[pulsar] branch master updated: Support shrink for TripleLongPriorityQueue (#15936)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 522afcfc44f Support shrink for TripleLongPriorityQueue (#15936)
522afcfc44f is described below
commit 522afcfc44f5be92811fec22cf00d357bedb8ad6
Author: feynmanlin <31...@qq.com>
AuthorDate: Tue Jun 7 08:52:40 2022 +0800
Support shrink for TripleLongPriorityQueue (#15936)
* Support shrinkage in TripleLongPriorityQueue
* Add unit test
* Remove unused code
* style
* Address comments
---
.../util/collections/TripleLongPriorityQueue.java | 59 ++++++++++++++++++++--
.../collections/TripleLongPriorityQueueTest.java | 35 +++++++++++++
2 files changed, 90 insertions(+), 4 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
index 1d8d909beae..487c2284cff 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.common.util.collections;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
@@ -31,16 +32,30 @@ public class TripleLongPriorityQueue implements AutoCloseable {
private static final int SIZE_OF_LONG = 8;
private static final int DEFAULT_INITIAL_CAPACITY = 16;
+ private static final float DEFAULT_SHRINK_FACTOR = 0.5f;
// Each item is composed of 3 longs
private static final int ITEMS_COUNT = 3;
private static final int TUPLE_SIZE = ITEMS_COUNT * SIZE_OF_LONG;
- private final ByteBuf buffer;
+ /**
+ * Reserve 10% of the capacity when shrinking to avoid frequent expansion and shrinkage.
+ */
+ private static final float RESERVATION_FACTOR = 0.9f;
+
+ private ByteBuf buffer;
+
+ private final int initialCapacity;
private int capacity;
private int size;
+ /**
+ * When size < capacity * shrinkFactor, may trigger shrinking.
+ */
+ private final float shrinkFactor;
+
+ private float shrinkThreshold;
/**
* Create a new priority queue with default initial capacity.
@@ -49,14 +64,22 @@ public class TripleLongPriorityQueue implements AutoCloseable {
this(DEFAULT_INITIAL_CAPACITY);
}
+ public TripleLongPriorityQueue(int initialCapacity, float shrinkFactor) {
+ checkArgument(shrinkFactor > 0);
+ this.initialCapacity = initialCapacity;
+ this.capacity = initialCapacity;
+ this.shrinkThreshold = this.capacity * shrinkFactor;
+ this.buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * TUPLE_SIZE);
+ this.size = 0;
+ this.shrinkFactor = shrinkFactor;
+ }
+
/**
* Create a new priority queue with a given initial capacity.
* @param initialCapacity
*/
public TripleLongPriorityQueue(int initialCapacity) {
- capacity = initialCapacity;
- buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * ITEMS_COUNT * SIZE_OF_LONG);
- size = 0;
+ this(initialCapacity, DEFAULT_SHRINK_FACTOR);
}
/**
@@ -122,6 +145,7 @@ public class TripleLongPriorityQueue implements AutoCloseable {
swap(0, size - 1);
size--;
siftDown(0);
+ shrinkCapacity();
}
/**
@@ -144,14 +168,36 @@ public class TripleLongPriorityQueue implements AutoCloseable {
public void clear() {
this.buffer.clear();
this.size = 0;
+ shrinkCapacity();
}
private void increaseCapacity() {
// For bigger sizes, increase by 50%
this.capacity += (capacity <= 256 ? capacity : capacity / 2);
+ this.shrinkThreshold = this.capacity * shrinkFactor;
buffer.capacity(this.capacity * TUPLE_SIZE);
}
+ private void shrinkCapacity() {
+ if (capacity > initialCapacity && size < shrinkThreshold) {
+ int decreasingSize = (int) (capacity * shrinkFactor * RESERVATION_FACTOR);
+ if (decreasingSize <= 0) {
+ return;
+ }
+ if (capacity - decreasingSize <= initialCapacity) {
+ this.capacity = initialCapacity;
+ } else {
+ this.capacity = capacity - decreasingSize;
+ }
+ this.shrinkThreshold = this.capacity * shrinkFactor;
+
+ ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(this.capacity * TUPLE_SIZE);
+ buffer.getBytes(0, newBuffer, size * TUPLE_SIZE);
+ buffer.release();
+ this.buffer = newBuffer;
+ }
+ }
+
private void siftUp(int idx) {
while (idx > 0) {
int parentIdx = (idx - 1) / 2;
@@ -229,4 +275,9 @@ public class TripleLongPriorityQueue implements AutoCloseable {
buffer.setLong(i2 + 1 * SIZE_OF_LONG, tmp2);
buffer.setLong(i2 + 2 * SIZE_OF_LONG, tmp3);
}
+
+ @VisibleForTesting
+ ByteBuf getBuffer() {
+ return buffer;
+ }
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
index 4cb1027e0a9..bd3aef86ad1 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueueTest.java
@@ -135,4 +135,39 @@ public class TripleLongPriorityQueueTest {
pq.close();
}
+
+ @Test
+ public void testShrink() throws Exception {
+ int initialCapacity = 20;
+ int tupleSize = 3 * 8;
+ TripleLongPriorityQueue pq = new TripleLongPriorityQueue(initialCapacity, 0.5f);
+ pq.add(0, 0, 0);
+ assertEquals(pq.size(), 1);
+ assertEquals(pq.getBuffer().capacity(), initialCapacity * tupleSize);
+
+ // Scale out to capacity * 2
+ triggerScaleOut(initialCapacity, pq);
+ int scaleCapacity = initialCapacity * 2;
+ assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
+ // Trigger shrinking
+ for (int i = 0; i < initialCapacity / 2 + 1; i++) {
+ pq.pop();
+ }
+ int capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
+ assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
+ // Scale out to capacity * 2
+ triggerScaleOut(initialCapacity, pq);
+ scaleCapacity = capacity * 2;
+ assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
+ // Trigger shrinking
+ pq.clear();
+ capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
+ assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
+ }
+
+ private void triggerScaleOut(int initialCapacity, TripleLongPriorityQueue pq) {
+ for (long i = 0; i < initialCapacity + 1; i++) {
+ pq.add(i, i, i);
+ }
+ }
}