You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/29 06:25:55 UTC
[pulsar] 15/17: [improve][broker] Use shrink map for message redelivery. (#15342)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b0eff93b8fdbee40caf269a430acbef617ed31fd
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Apr 28 11:06:15 2022 +0800
[improve][broker] Use shrink map for message redelivery. (#15342)
(cherry picked from commit 615f05af3e7c72d83b3fe24f64566ed58244ea5d)
---
.../broker/service/persistent/MessageRedeliveryController.java | 9 ++++++---
.../service/persistent/MessageRedeliveryControllerTest.java | 2 +-
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index be143565c48..c7f96fffcef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -26,8 +26,8 @@ import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
import org.apache.pulsar.common.util.collections.LongPairSet;
@@ -37,7 +37,10 @@ public class MessageRedeliveryController {
public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
- this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+ this.hashesToBeBlocked = allowOutOfOrderDelivery
+ ? null
+ : ConcurrentLongLongPairHashMap
+ .newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
}
public boolean add(long ledgerId, long entryId) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
index 9a785f6f95f..478677a25e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
@@ -30,7 +30,7 @@ import java.lang.reflect.Field;
import java.util.Set;
import java.util.TreeSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.LongPairSet;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;