You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 14:11:26 UTC
[pulsar] 15/15: [improve][broker] Support shrink for ConcurrentSortedLongPairSet (#15354)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9e8c3242f70947d7137f858a8e28ff18df5dffed
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Apr 28 20:30:30 2022 +0800
[improve][broker] Support shrink for ConcurrentSortedLongPairSet (#15354)
(cherry picked from commit 24d4d76bb9e39010bae3f4cbd8ddba6422570b4e)
---
.../persistent/MessageRedeliveryController.java | 2 +-
.../util/collections/ConcurrentLongPairSet.java | 53 ++++++++++++----------
.../collections/ConcurrentSortedLongPairSet.java | 27 +++++++++--
.../common/util/collections/LongPairSet.java | 7 +++
.../ConcurrentSortedLongPairSetTest.java | 43 ++++++++++++++++++
5 files changed, 105 insertions(+), 27 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index c7f96fffcef..46fa1b2b050 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -36,7 +36,7 @@ public class MessageRedeliveryController {
private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
- this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+ this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2, true);
this.hashesToBeBlocked = allowOutOfOrderDelivery
? null
: ConcurrentLongLongPairHashMap
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index 66ecaee4bfa..7b5e75813fa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -175,6 +175,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
return size;
}
+ @Override
public long capacity() {
long capacity = 0;
for (int i = 0; i < sections.length; i++) {
@@ -447,20 +448,7 @@ public class ConcurrentLongPairSet implements LongPairSet {
bucket = (bucket + 2) & (table.length - 1);
}
} finally {
- if (autoShrink && size < resizeThresholdBelow) {
- try {
- int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
- int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
- if (newCapacity < capacity && newResizeThresholdUp > size) {
- // shrink the hashmap
- rehash(newCapacity);
- }
- } finally {
- unlockWrite(stamp);
- }
- } else {
- unlockWrite(stamp);
- }
+ tryShrinkThenUnlock(stamp);
}
}
@@ -469,23 +457,42 @@ public class ConcurrentLongPairSet implements LongPairSet {
int removedItems = 0;
// Go through all the buckets for this section
- for (int bucket = 0; bucket < table.length; bucket += 2) {
- long storedItem1 = table[bucket];
- long storedItem2 = table[bucket + 1];
-
- if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
- if (filter.test(storedItem1, storedItem2)) {
- long h = hash(storedItem1, storedItem2);
- if (remove(storedItem1, storedItem2, (int) h)) {
+ long stamp = writeLock();
+ try {
+ for (int bucket = 0; bucket < table.length; bucket += 2) {
+ long storedItem1 = table[bucket];
+ long storedItem2 = table[bucket + 1];
+ if (storedItem1 != DeletedItem && storedItem1 != EmptyItem) {
+ if (filter.test(storedItem1, storedItem2)) {
+ SIZE_UPDATER.decrementAndGet(this);
+ cleanBucket(bucket);
removedItems++;
}
}
}
+ } finally {
+ tryShrinkThenUnlock(stamp);
}
-
return removedItems;
}
+ private void tryShrinkThenUnlock(long stamp) {
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp > size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
+ }
+
private void cleanBucket(int bucket) {
int nextInArray = (bucket + 2) & (table.length - 1);
if (table[nextInArray] == EmptyItem) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
index e4cb668fc92..06efd0490d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
@@ -48,14 +48,15 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPairC
public class ConcurrentSortedLongPairSet implements LongPairSet {
protected final NavigableMap<Long, ConcurrentLongPairSet> longPairSets = new ConcurrentSkipListMap<>();
- private int expectedItems;
- private int concurrencyLevel;
+ private final int expectedItems;
+ private final int concurrencyLevel;
/**
* If {@link #longPairSets} adds and removes the item-set frequently then it allocates and removes
* {@link ConcurrentLongPairSet} for the same item multiple times which can lead to gc-puases. To avoid such
* situation, avoid removing empty LogPairSet until it reaches max limit.
*/
- private int maxAllowedSetOnRemove;
+ private final int maxAllowedSetOnRemove;
+ private final boolean autoShrink;
private static final int DEFAULT_MAX_ALLOWED_SET_ON_REMOVE = 10;
public ConcurrentSortedLongPairSet() {
@@ -70,10 +71,20 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
this(expectedItems, concurrencyLevel, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE);
}
+ public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, boolean autoShrink) {
+ this(expectedItems, concurrencyLevel, DEFAULT_MAX_ALLOWED_SET_ON_REMOVE, autoShrink);
+ }
+
public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, int maxAllowedSetOnRemove) {
+ this(expectedItems, concurrencyLevel, maxAllowedSetOnRemove, false);
+ }
+
+ public ConcurrentSortedLongPairSet(int expectedItems, int concurrencyLevel, int maxAllowedSetOnRemove,
+ boolean autoShrink) {
this.expectedItems = expectedItems;
this.concurrencyLevel = concurrencyLevel;
this.maxAllowedSetOnRemove = maxAllowedSetOnRemove;
+ this.autoShrink = autoShrink;
}
@Override
@@ -82,6 +93,7 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
(key) -> ConcurrentLongPairSet.newBuilder()
.expectedItems(expectedItems)
.concurrencyLevel(concurrencyLevel)
+ .autoShrink(autoShrink)
.build());
return messagesToReplay.add(item1, item2);
}
@@ -194,6 +206,15 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
return size.get();
}
+ @Override
+ public long capacity() {
+ AtomicLong capacity = new AtomicLong(0);
+ longPairSets.forEach((item1, longPairSet) -> {
+ capacity.getAndAdd(longPairSet.capacity());
+ });
+ return capacity.get();
+ }
+
@Override
public boolean contains(long item1, long item2) {
ConcurrentLongPairSet longPairSet = longPairSets.get(item1);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
index 32de7e4c232..f27b994f777 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairSet.java
@@ -107,6 +107,13 @@ public interface LongPairSet {
*/
long size();
+ /**
+ * Returns capacity of the set.
+ *
+ * @return
+ */
+ long capacity();
+
/**
* Checks if given (item1,item2) composite value exists into set.
*
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
index fcb9884a795..62dfa21dc81 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
+import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
@@ -181,6 +182,20 @@ public class ConcurrentSortedLongPairSetTest {
values = new ArrayList<>(set.items());
values.sort(null);
assertEquals(values, Lists.newArrayList(new LongPair(6, 6), new LongPair(7, 7)));
+
+ set = new ConcurrentSortedLongPairSet(128, 2, true);
+ set.add(2, 2);
+ set.add(1, 3);
+ set.add(3, 1);
+ set.add(2, 1);
+ set.add(3, 2);
+ set.add(1, 2);
+ set.add(1, 1);
+ removeItems = set.removeIf((ledgerId, entryId) -> {
+ return ComparisonChain.start().compare(ledgerId, 1).compare(entryId, 3)
+ .result() <= 0;
+ });
+ assertEquals(removeItems, 3);
}
@Test
@@ -245,4 +260,32 @@ public class ConcurrentSortedLongPairSetTest {
set.add(1, 1);
assertFalse(set.isEmpty());
}
+
+ @Test
+ public void testShrink() {
+ LongPairSet set = new ConcurrentSortedLongPairSet(2, 1, true);
+ set.add(0, 0);
+ assertTrue(set.capacity() == 4);
+ set.add(0, 1);
+ assertTrue(set.capacity() == 4);
+ set.add(1, 1);
+ assertTrue(set.capacity() == 8);
+ set.add(1, 2);
+ assertTrue(set.capacity() == 8);
+ set.add(1, 3);
+ set.add(1, 4);
+ set.add(1, 5);
+ assertTrue(set.capacity() == 12);
+ set.remove(1, 5);
+ // not shrink
+ assertTrue(set.capacity() == 12);
+ set.remove(1, 4);
+ // the internal map does not keep shrinking at every remove() operation
+ assertTrue(set.capacity() == 12);
+ set.remove(1, 3);
+ set.remove(1, 2);
+ set.remove(1, 1);
+ // shrink
+ assertTrue(set.capacity() == 8);
+ }
}