You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/29 06:25:55 UTC

[pulsar] 15/17: [improve][broker] Use shrink map for message redelivery. (#15342)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b0eff93b8fdbee40caf269a430acbef617ed31fd
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Apr 28 11:06:15 2022 +0800

    [improve][broker] Use shrink map for message redelivery. (#15342)
    
    (cherry picked from commit 615f05af3e7c72d83b3fe24f64566ed58244ea5d)
---
 .../broker/service/persistent/MessageRedeliveryController.java   | 9 ++++++---
 .../service/persistent/MessageRedeliveryControllerTest.java      | 2 +-
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index be143565c48..c7f96fffcef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -26,8 +26,8 @@ import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
 import org.apache.pulsar.common.util.collections.LongPairSet;
 
@@ -37,7 +37,10 @@ public class MessageRedeliveryController {
 
     public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
         this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
-        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery
+                ? null
+                : ConcurrentLongLongPairHashMap
+                    .newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
     }
 
     public boolean add(long ledgerId, long entryId) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
index 9a785f6f95f..478677a25e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
@@ -30,7 +30,7 @@ import java.lang.reflect.Field;
 import java.util.Set;
 import java.util.TreeSet;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
 import org.apache.pulsar.common.util.collections.LongPairSet;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;