You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 13:04:07 UTC

[pulsar] 03/26: support shrink for map or set (#14663)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 14d9b8492d6dc4781bc83f817ca7898966b4214d
Author: LinChen <15...@qq.com>
AuthorDate: Mon Mar 14 23:23:47 2022 +0800

    support shrink for map or set (#14663)
    
    * support shrink for map or set
    
    * check style
    
    * check style
    
    (cherry picked from commit 1d10dff757ac7b9a203c14d2085a480495fb141b)
---
 .../mledger/impl/ManagedLedgerOfflineBacklog.java  |   3 +-
 .../broker/loadbalance/impl/LoadManagerShared.java |  20 ++-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  18 ++-
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  18 ++-
 .../pulsar/broker/namespace/NamespaceService.java  |  13 +-
 .../org/apache/pulsar/broker/rest/TopicsBase.java  |   3 +-
 .../pulsar/broker/service/BrokerService.java       |  42 ++++--
 .../service/nonpersistent/NonPersistentTopic.java  |  12 +-
 .../service/persistent/MessageDeduplication.java   |  12 +-
 .../broker/service/persistent/PersistentTopic.java |  20 ++-
 .../broker/stats/ClusterReplicationMetrics.java    |   3 +-
 .../AntiAffinityNamespaceGroupTest.java            |  15 ++-
 .../loadbalance/impl/LoadManagerSharedTest.java    |  13 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  24 +++-
 .../apache/pulsar/client/impl/ConsumerBase.java    |   3 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   3 +-
 .../client/impl/PartitionedProducerImpl.java       |   3 +-
 .../apache/pulsar/client/impl/ProducerBase.java    |   3 +-
 .../impl/AcknowledgementsGroupingTrackerTest.java  |   3 +-
 .../util/collections/ConcurrentLongPairSet.java    | 148 +++++++++++++++++++--
 .../util/collections/ConcurrentOpenHashMap.java    | 140 +++++++++++++++++--
 .../util/collections/ConcurrentOpenHashSet.java    | 140 +++++++++++++++++--
 .../collections/ConcurrentSortedLongPairSet.java   |   5 +-
 .../collections/ConcurrentLongPairSetTest.java     | 111 +++++++++++++---
 .../collections/ConcurrentOpenHashMapTest.java     | 125 ++++++++++++++---
 .../collections/ConcurrentOpenHashSetTest.java     |  73 +++++++++-
 .../pulsar/sql/presto/PulsarRecordCursor.java      |   3 +-
 .../apache/pulsar/websocket/WebSocketService.java  |  23 +++-
 .../apache/pulsar/websocket/stats/ProxyStats.java  |   4 +-
 29 files changed, 860 insertions(+), 143 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
index 99cc6c8842e..e0362205c6d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
@@ -220,7 +220,8 @@ public class ManagedLedgerOfflineBacklog {
         BookKeeper bk = factory.getBookKeeper();
         final CountDownLatch allCursorsCounter = new CountDownLatch(1);
         final long errorInReadingCursor = -1;
-        ConcurrentOpenHashMap<String, Long> ledgerRetryMap = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
+                ConcurrentOpenHashMap.<String, Long>newBuilder().build();
 
         final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
         final PositionImpl lastLedgerPosition = new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 0c8f8a00d1c..c0ee0d2f986 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -190,7 +190,9 @@ public class LoadManagerShared {
         bundles.forEach(bundleName -> {
             final String namespaceName = getNamespaceNameFromBundleName(bundleName);
             final String bundleRange = getBundleRangeFromBundleName(bundleName);
-            target.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
+            target.computeIfAbsent(namespaceName,
+                    k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+                    .add(bundleRange);
         });
     }
 
@@ -263,8 +265,12 @@ public class LoadManagerShared {
 
         for (final String broker : candidates) {
             int bundles = (int) brokerToNamespaceToBundleRange
-                    .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
-                    .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size();
+                    .computeIfAbsent(broker,
+                            k -> ConcurrentOpenHashMap.<String,
+                                    ConcurrentOpenHashSet<String>>newBuilder().build())
+                    .computeIfAbsent(namespaceName,
+                            k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+                    .size();
             leastBundles = Math.min(leastBundles, bundles);
             if (leastBundles == 0) {
                 break;
@@ -276,8 +282,12 @@ public class LoadManagerShared {
 
         final int finalLeastBundles = leastBundles;
         candidates.removeIf(
-                broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
-                        .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size() > finalLeastBundles);
+                broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker,
+                        k -> ConcurrentOpenHashMap.<String,
+                                ConcurrentOpenHashSet<String>>newBuilder().build())
+                        .computeIfAbsent(namespaceName,
+                                k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+                        .size() > finalLeastBundles);
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 326f6af4375..08620340497 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -204,7 +204,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
      */
     public ModularLoadManagerImpl() {
         brokerCandidateCache = new HashSet<>();
-        brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+        brokerToNamespaceToBundleRange =
+                ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
+                        .build();
         defaultStats = new NamespaceBundleStats();
         filterPipeline = new ArrayList<>();
         loadData = new LoadData();
@@ -567,7 +570,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
             brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, defaultStats);
             final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
                     brokerToNamespaceToBundleRange
-                            .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>());
+                            .computeIfAbsent(broker, k ->
+                                    ConcurrentOpenHashMap.<String,
+                                            ConcurrentOpenHashSet<String>>newBuilder()
+                                            .build());
             synchronized (namespaceToBundleRange) {
                 namespaceToBundleRange.clear();
                 LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
@@ -850,9 +856,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
                 final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
                 final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
                         brokerToNamespaceToBundleRange
-                                .computeIfAbsent(broker.get(), k -> new ConcurrentOpenHashMap<>());
+                                .computeIfAbsent(broker.get(),
+                                        k -> ConcurrentOpenHashMap.<String,
+                                                ConcurrentOpenHashSet<String>>newBuilder()
+                                                .build());
                 synchronized (namespaceToBundleRange) {
-                    namespaceToBundleRange.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>())
+                    namespaceToBundleRange.computeIfAbsent(namespaceName,
+                            k -> ConcurrentOpenHashSet.<String>newBuilder().build())
                             .add(bundleRange);
                 }
                 return broker;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 092fe2c852d..4f7e37ad344 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -202,7 +202,10 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
         bundleLossesCache = new HashSet<>();
         brokerCandidateCache = new HashSet<>();
         availableBrokersCache = new HashSet<>();
-        brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+        brokerToNamespaceToBundleRange =
+                ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
+                        .build();
         this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
             @Override
             public boolean isEnablePersistentTopics(String brokerUrl) {
@@ -851,8 +854,12 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
                 // same broker.
                 brokerToNamespaceToBundleRange
                         .computeIfAbsent(selectedRU.getResourceId().replace("http://", ""),
-                                k -> new ConcurrentOpenHashMap<>())
-                        .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
+                                k -> ConcurrentOpenHashMap.<String,
+                                        ConcurrentOpenHashSet<String>>newBuilder()
+                                        .build())
+                        .computeIfAbsent(namespaceName, k ->
+                                ConcurrentOpenHashSet.<String>newBuilder().build())
+                        .add(bundleRange);
                 ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
                 resourceUnitRankings.put(selectedRU, ranking);
             }
@@ -1271,7 +1278,10 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
             final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
             final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
                     brokerToNamespaceToBundleRange
-                            .computeIfAbsent(broker.replace("http://", ""), k -> new ConcurrentOpenHashMap<>());
+                            .computeIfAbsent(broker.replace("http://", ""),
+                                    k -> ConcurrentOpenHashMap.<String,
+                                            ConcurrentOpenHashSet<String>>newBuilder()
+                                            .build());
             namespaceToBundleRange.clear();
             LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles, namespaceToBundleRange);
             LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles, namespaceToBundleRange);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 5ae88adbaa1..98e65dc3e56 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -165,7 +165,8 @@ public class NamespaceService implements AutoCloseable {
         this.loadManager = pulsar.getLoadManager();
         this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
         this.ownershipCache = new OwnershipCache(pulsar, bundleFactory, this);
-        this.namespaceClients = new ConcurrentOpenHashMap<>();
+        this.namespaceClients =
+                ConcurrentOpenHashMap.<ClusterDataImpl, PulsarClientImpl>newBuilder().build();
         this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
         this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
         this.localPoliciesCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalPolicies.class);
@@ -355,9 +356,15 @@ public class NamespaceService implements AutoCloseable {
     }
 
     private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
-            findingBundlesAuthoritative = new ConcurrentOpenHashMap<>();
+            findingBundlesAuthoritative =
+            ConcurrentOpenHashMap.<NamespaceBundle,
+                    CompletableFuture<Optional<LookupResult>>>newBuilder()
+                    .build();
     private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
-            findingBundlesNotAuthoritative = new ConcurrentOpenHashMap<>();
+            findingBundlesNotAuthoritative =
+            ConcurrentOpenHashMap.<NamespaceBundle,
+                    CompletableFuture<Optional<LookupResult>>>newBuilder()
+                    .build();
 
     /**
      * Main internal method to lookup and setup ownership of service unit to a broker.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index f89abf9bea3..770d77794d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -431,7 +431,8 @@ public class TopicsBase extends PersistentTopicsBase {
                             partitionedTopicName, result.getLookupData());
                 }
                 pulsar().getBrokerService().getOwningTopics().computeIfAbsent(partitionedTopicName
-                                .getPartitionedTopicName(), (key) -> new ConcurrentOpenHashSet<Integer>())
+                                .getPartitionedTopicName(),
+                        (key) -> ConcurrentOpenHashSet.<Integer>newBuilder().build())
                         .add(partitionedTopicName.getPartitionIndex());
                 completeLookup(Pair.of(Collections.emptyList(), false), redirectAddresses, future);
             } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a388f13f2dc..b7931be9b4b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -284,17 +284,28 @@ public class BrokerService implements Closeable {
         this.preciseTopicPublishRateLimitingEnable =
                 pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
         this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
-        this.topics = new ConcurrentOpenHashMap<>();
-        this.replicationClients = new ConcurrentOpenHashMap<>();
-        this.clusterAdmins = new ConcurrentOpenHashMap<>();
+        this.topics =
+                ConcurrentOpenHashMap.<String, CompletableFuture<Optional<Topic>>>newBuilder()
+                .build();
+        this.replicationClients =
+                ConcurrentOpenHashMap.<String, PulsarClient>newBuilder().build();
+        this.clusterAdmins =
+                ConcurrentOpenHashMap.<String, PulsarAdmin>newBuilder().build();
         this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds();
-        this.configRegisteredListeners = new ConcurrentOpenHashMap<>();
+        this.configRegisteredListeners =
+                ConcurrentOpenHashMap.<String, Consumer<?>>newBuilder().build();
         this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();
 
-        this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>();
-        this.owningTopics = new ConcurrentOpenHashMap<>();
+        this.multiLayerTopicsMap = ConcurrentOpenHashMap.<String,
+                ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>>newBuilder()
+                .build();
+        this.owningTopics = ConcurrentOpenHashMap.<String,
+                ConcurrentOpenHashSet<Integer>>newBuilder()
+                .build();
         this.pulsarStats = new PulsarStats(pulsar);
-        this.offlineTopicStatCache = new ConcurrentOpenHashMap<>();
+        this.offlineTopicStatCache =
+                ConcurrentOpenHashMap.<TopicName,
+                        PersistentOfflineTopicStats>newBuilder().build();
 
         this.topicOrderedExecutor = OrderedScheduler.newSchedulerBuilder()
                 .numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
@@ -329,7 +340,8 @@ public class BrokerService implements Closeable {
         this.backlogQuotaChecker = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
         this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
-        this.blockedDispatchers = new ConcurrentOpenHashSet<>();
+        this.blockedDispatchers =
+                ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
         // update dynamic configuration and register-listener
         updateConfigurationAndRegisterListeners();
         this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
@@ -1595,8 +1607,12 @@ public class BrokerService implements Closeable {
                         synchronized (multiLayerTopicsMap) {
                             String serviceUnit = namespaceBundle.toString();
                             multiLayerTopicsMap //
-                                    .computeIfAbsent(topicName.getNamespace(), k -> new ConcurrentOpenHashMap<>()) //
-                                    .computeIfAbsent(serviceUnit, k -> new ConcurrentOpenHashMap<>()) //
+                                    .computeIfAbsent(topicName.getNamespace(),
+                                            k -> ConcurrentOpenHashMap.<String,
+                                                    ConcurrentOpenHashMap<String, Topic>>newBuilder()
+                                                    .build()) //
+                                    .computeIfAbsent(serviceUnit,
+                                            k -> ConcurrentOpenHashMap.<String, Topic>newBuilder().build()) //
                                     .put(topicName.toString(), topic);
                         }
                     }
@@ -2413,7 +2429,8 @@ public class BrokerService implements Closeable {
     }
 
     private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
-        ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
+                ConcurrentOpenHashMap.<String, ConfigField>newBuilder().build();
         for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
             if (field != null && field.isAnnotationPresent(FieldContext.class)) {
                 field.setAccessible(true);
@@ -2426,7 +2443,8 @@ public class BrokerService implements Closeable {
     }
 
     private ConcurrentOpenHashMap<String, Object> getRuntimeConfigurationMap() {
-        ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap =
+                ConcurrentOpenHashMap.<String, Object>newBuilder().build();
         for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
             if (field != null && field.isAnnotationPresent(FieldContext.class)) {
                 field.setAccessible(true);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index f58a4fd644f..3818b8abc2e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -143,8 +143,16 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
     public NonPersistentTopic(String topic, BrokerService brokerService) {
         super(topic, brokerService);
 
-        this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
-        this.replicators = new ConcurrentOpenHashMap<>(16, 1);
+        this.subscriptions =
+                ConcurrentOpenHashMap.<String, NonPersistentSubscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
+        this.replicators =
+                ConcurrentOpenHashMap.<String, NonPersistentReplicator>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         this.isFenced = false;
         registerTopicPolicyListener();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 201e6129a48..90ee3b67e3c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -97,12 +97,20 @@ public class MessageDeduplication {
     // Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before
     // the messages are persisted
     @VisibleForTesting
-    final ConcurrentOpenHashMap<String, Long> highestSequencedPushed = new ConcurrentOpenHashMap<>(16, 1);
+    final ConcurrentOpenHashMap<String, Long> highestSequencedPushed =
+            ConcurrentOpenHashMap.<String, Long>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
 
     // Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated
     // after the messages are persisted
     @VisibleForTesting
-    final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted = new ConcurrentOpenHashMap<>(16, 1);
+    final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted =
+            ConcurrentOpenHashMap.<String, Long>newBuilder()
+            .expectedItems(16)
+            .concurrencyLevel(1)
+            .build();
 
     // Number of persisted entries after which to store a snapshot of the sequence ids map
     private final int snapshotInterval;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3914278ae7f..5a970b48f58 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -254,8 +254,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
         super(topic, brokerService);
         this.ledger = ledger;
-        this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
-        this.replicators = new ConcurrentOpenHashMap<>(16, 1);
+        this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
+        this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         this.backloggedCursorThresholdEntries =
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
         initializeRateLimiterIfNeeded(Optional.empty());
@@ -344,8 +350,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         super(topic, brokerService);
         this.ledger = ledger;
         this.messageDeduplication = messageDeduplication;
-        this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
-        this.replicators = new ConcurrentOpenHashMap<>(16, 1);
+        this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
+        this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
         this.backloggedCursorThresholdEntries =
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java
index 1086563085b..6718f074c67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java
@@ -35,7 +35,8 @@ public class ClusterReplicationMetrics {
     public ClusterReplicationMetrics(String localCluster, boolean metricsEnabled) {
         metricsList = new ArrayList<>();
         this.localCluster = localCluster;
-        metricsMap = new ConcurrentOpenHashMap<>();
+        metricsMap = ConcurrentOpenHashMap.<String, ReplicationMetrics>newBuilder()
+                .build();
         this.metricsEnabled = metricsEnabled;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 1429c7376f4..9e81a3e1db9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -234,7 +234,8 @@ public class AntiAffinityNamespaceGroupTest {
         brokerToDomainMap.put("brokerName-3", "domain-1");
 
         Set<String> candidate = Sets.newHashSet();
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange =
+                ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build();
 
         assertEquals(brokers.size(), totalBrokers);
 
@@ -320,7 +321,8 @@ public class AntiAffinityNamespaceGroupTest {
 
         Set<String> brokers = Sets.newHashSet();
         Set<String> candidate = Sets.newHashSet();
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange =
+                ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build();
         brokers.add("broker-0");
         brokers.add("broker-1");
         brokers.add("broker-2");
@@ -367,9 +369,11 @@ public class AntiAffinityNamespaceGroupTest {
     private void selectBrokerForNamespace(
             ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
             String broker, String namespace, String assignedBundleName) {
-        ConcurrentOpenHashSet<String> bundleSet = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<String> bundleSet =
+                ConcurrentOpenHashSet.<String>newBuilder().build();
         bundleSet.add(assignedBundleName);
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> nsToBundleMap = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> nsToBundleMap =
+                ConcurrentOpenHashMap.<String, ConcurrentOpenHashSet<String>>newBuilder().build();
         nsToBundleMap.put(namespace, bundleSet);
         brokerToNamespaceToBundleRange.put(broker, nsToBundleMap);
     }
@@ -469,7 +473,8 @@ public class AntiAffinityNamespaceGroupTest {
 
         Set<String> brokers = Sets.newHashSet();
         Set<String> candidate = Sets.newHashSet();
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange =
+                ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build();
         brokers.add("broker-0");
         brokers.add("broker-1");
         brokers.add("broker-2");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
index 716b9716425..d23772185f1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
@@ -36,7 +36,10 @@ public class LoadManagerSharedTest {
         String assignedBundle = namespace + "/0x00000000_0x40000000";
 
         Set<String> candidates = Sets.newHashSet();
-        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map =
+                ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
+                        .build();
         LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
         Assert.assertEquals(candidates.size(), 0);
 
@@ -80,8 +83,12 @@ public class LoadManagerSharedTest {
     private static void fillBrokerToNamespaceToBundleMap(
             ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map,
             String broker, String namespace, String bundle) {
-        map.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
-                .computeIfAbsent(namespace, k -> new ConcurrentOpenHashSet<>()).add(bundle);
+        map.computeIfAbsent(broker,
+                k -> ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashSet<String>>newBuilder().build())
+                .computeIfAbsent(namespace,
+                        k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+                .add(bundle);
     }
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 4e53819e44c..6c8b4d5f334 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -850,7 +850,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         addConsumerToSubscription.setAccessible(true);
 
         // for count consumers on topic
-        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+                ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         subscriptions.put("sub-1", sub);
         subscriptions.put("sub-2", sub2);
         Field field = topic.getClass().getDeclaredField("subscriptions");
@@ -954,7 +958,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         addConsumerToSubscription.setAccessible(true);
 
         // for count consumers on topic
-        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+                ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         subscriptions.put("sub-1", sub);
         subscriptions.put("sub-2", sub2);
         Field field = topic.getClass().getDeclaredField("subscriptions");
@@ -1081,7 +1089,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         addConsumerToSubscription.setAccessible(true);
 
         // for count consumers on topic
-        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+                ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         subscriptions.put("sub1", sub1);
         subscriptions.put("sub2", sub2);
         Field field = topic.getClass().getDeclaredField("subscriptions");
@@ -2071,7 +2083,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
     public void testCheckInactiveSubscriptions() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
 
-        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+                ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         // This subscription is connected by consumer.
         PersistentSubscription nonDeletableSubscription1 = spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "nonDeletableSubscription1", cursorMock, false);
         subscriptions.put(nonDeletableSubscription1.getName(), nonDeletableSubscription1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 5b14a841b8c..689c4eb7405 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -110,7 +110,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.consumerEventListener = conf.getConsumerEventListener();
         // Always use growable queue since items can exceed the advertised size
         this.incomingMessages = new GrowableArrayBlockingQueue<>();
-        this.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
+        this.unAckedChunkedMessageIdSequenceMap =
+                ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build();
         this.executorProvider = executorProvider;
         this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
         this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 0304a608f05..b4ee5a2e784 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -183,7 +183,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     protected volatile boolean paused;
 
-    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
+    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap =
+            ConcurrentOpenHashMap.<String, ChunkedMessageCtx>newBuilder().build();
     private int pendingChunkedMessageCount = 0;
     protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
     private boolean expireChunkMessageTaskScheduled = false;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index e61e7c82166..4a84ba03ebe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -76,7 +76,8 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
                                    int numPartitions, CompletableFuture<Producer<T>> producerCreatedFuture,
                                    Schema<T> schema, ProducerInterceptors interceptors) {
         super(client, topic, conf, producerCreatedFuture, schema, interceptors);
-        this.producers = new ConcurrentOpenHashMap<>();
+        this.producers =
+                ConcurrentOpenHashMap.<Integer, ProducerImpl<T>>newBuilder().build();
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
         stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 0117e651e6f..c7b9d24151f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -50,7 +50,8 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
         this.conf = conf;
         this.schema = schema;
         this.interceptors = interceptors;
-        this.schemaCache = new ConcurrentOpenHashMap<>();
+        this.schemaCache =
+                ConcurrentOpenHashMap.<SchemaHash, byte[]>newBuilder().build();
         if (!conf.isMultiSchema()) {
             multiSchemaMode = MultiSchemaMode.Disabled;
         }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index c0b952a281a..d577f48357c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -60,7 +60,8 @@ public class AcknowledgementsGroupingTrackerTest {
     public void setup() throws NoSuchFieldException, IllegalAccessException {
         eventLoopGroup = new NioEventLoopGroup(1);
         consumer = mock(ConsumerImpl.class);
-        consumer.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
+        consumer.unAckedChunkedMessageIdSequenceMap =
+                ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build();
         cnx = spy(new ClientCnxTest(new ClientConfigurationData(), new NioEventLoopGroup()));
         PulsarClientImpl client = mock(PulsarClientImpl.class);
         doReturn(client).when(consumer).getClient();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index f1806c511e2..abbe11576a9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -45,8 +45,74 @@ public class ConcurrentLongPairSet implements LongPairSet {
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
     private final Section[] sections;
 
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder of ConcurrentLongPairSet.
+     */
+    public static class Builder {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentLongPairSet build() {
+            return new ConcurrentLongPairSet(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
+
     /**
      * Represents a function that accepts an object of the {@code LongPair} type.
      */
@@ -61,18 +127,33 @@ public class ConcurrentLongPairSet implements LongPairSet {
         void accept(long v1, long v2);
     }
 
+    @Deprecated
     public ConcurrentLongPairSet() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentLongPairSet(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentLongPairSet(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentLongPairSet(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
@@ -80,10 +161,12 @@ public class ConcurrentLongPairSet implements LongPairSet {
         this.sections = new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section(perSectionCapacity);
+            sections[i] = new Section(perSectionCapacity, mapFillFactor, mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
+    @Override
     public long size() {
         long size = 0;
         for (int i = 0; i < sections.length; i++) {
@@ -214,18 +297,33 @@ public class ConcurrentLongPairSet implements LongPairSet {
         private volatile long[] table;
 
         private volatile int capacity;
+        private final int initCapacity;
         private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER = AtomicIntegerFieldUpdater
                 .newUpdater(Section.class, "size");
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.table = new long[2 * this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * SetFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
             Arrays.fill(table, EmptyItem);
         }
 
@@ -314,9 +412,11 @@ public class ConcurrentLongPairSet implements LongPairSet {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -347,7 +447,20 @@ public class ConcurrentLongPairSet implements LongPairSet {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -379,6 +492,16 @@ public class ConcurrentLongPairSet implements LongPairSet {
                 table[bucket] = EmptyItem;
                 table[bucket + 1] = EmptyItem;
                 --usedBuckets;
+
+                // Cleanup all the buckets that were in `DeletedKey` state,
+                // so that we can reduce unnecessary expansions
+                bucket = (bucket - 1) & (table.length - 1);
+                while (table[bucket] == DeletedItem) {
+                    table[bucket] = EmptyItem;
+                    --usedBuckets;
+
+                    bucket = (bucket - 1) & (table.length - 1);
+                }
             } else {
                 table[bucket] = DeletedItem;
                 table[bucket + 1] = DeletedItem;
@@ -392,6 +515,9 @@ public class ConcurrentLongPairSet implements LongPairSet {
                 Arrays.fill(table, EmptyItem);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -431,9 +557,8 @@ public class ConcurrentLongPairSet implements LongPairSet {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             long[] newTable = new long[2 * newCapacity];
             Arrays.fill(newTable, EmptyItem);
 
@@ -451,7 +576,8 @@ public class ConcurrentLongPairSet implements LongPairSet {
             // Capacity needs to be updated after the values, so that we won't see
             // a capacity value bigger than the actual array size
             capacity = newCapacity;
-            resizeThreshold = (int) (capacity * SetFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static void insertKeyValueNoLock(long[] table, int capacity, long item1, long item2) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 5966a95e5f3..1ccbeb3b6b5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -64,33 +64,112 @@ public class ConcurrentOpenHashMap<K, V> {
         }
     };
 
-    private static final float MapFillFactor = 0.66f;
-
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
     private final Section<K, V>[] sections;
 
+    public static <K, V> Builder<K, V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
+     * Builder of ConcurrentOpenHashMap.
+     */
+    public static class Builder<K, V> {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder<K, V> expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder<K, V> concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder<K, V> mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder<K, V> mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder<K, V> expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder<K, V> shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder<K, V> autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentOpenHashMap<K, V> build() {
+            return new ConcurrentOpenHashMap<>(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
+    @Deprecated
     public ConcurrentOpenHashMap() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentOpenHashMap(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
-        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
         this.sections = (Section<K, V>[]) new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section<>(perSectionCapacity);
+            sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -208,18 +287,33 @@ public class ConcurrentOpenHashMap<K, V> {
         private volatile Object[] table;
 
         private volatile int capacity;
+        private final int initCapacity;
         private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
                 AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.table = new Object[2 * this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
         }
 
         V get(K key, int keyHash) {
@@ -316,9 +410,11 @@ public class ConcurrentOpenHashMap<K, V> {
                     bucket = (bucket + 2) & (table.length - 1);
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -363,7 +459,20 @@ public class ConcurrentOpenHashMap<K, V> {
                 }
 
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -374,6 +483,9 @@ public class ConcurrentOpenHashMap<K, V> {
                 Arrays.fill(table, EmptyKey);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -415,9 +527,8 @@ public class ConcurrentOpenHashMap<K, V> {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             Object[] newTable = new Object[2 * newCapacity];
 
             // Re-hash table
@@ -432,7 +543,8 @@ public class ConcurrentOpenHashMap<K, V> {
             table = newTable;
             capacity = newCapacity;
             usedBuckets = size;
-            resizeThreshold = (int) (capacity * MapFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static <K, V> void insertKeyValueNoLock(Object[] table, int capacity, K key, V value) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 8e0e69d32df..28f0df0ff20 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -43,33 +43,112 @@ public class ConcurrentOpenHashSet<V> {
     private static final Object EmptyValue = null;
     private static final Object DeletedValue = new Object();
 
-    private static final float MapFillFactor = 0.66f;
-
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
     private final Section<V>[] sections;
 
+    public static <V> Builder<V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
+     * Builder of ConcurrentOpenHashSet.
+     */
+    public static class Builder<V> {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder<V> expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder<V> concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder<V> mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder<V> mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder<V> expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder<V> shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder<V> autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentOpenHashSet<V> build() {
+            return new ConcurrentOpenHashSet<>(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
+    @Deprecated
     public ConcurrentOpenHashSet() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentOpenHashSet(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
-        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
         this.sections = (Section<V>[]) new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section<>(perSectionCapacity);
+            sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -177,18 +256,33 @@ public class ConcurrentOpenHashSet<V> {
         private volatile V[] values;
 
         private volatile int capacity;
+        private final int initCapacity;
         private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
                 AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.values = (V[]) new Object[this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
         }
 
         boolean contains(V value, int keyHash) {
@@ -284,9 +378,11 @@ public class ConcurrentOpenHashSet<V> {
                     ++bucket;
                 }
             } finally {
-                if (usedBuckets > resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -319,7 +415,20 @@ public class ConcurrentOpenHashSet<V> {
                 }
 
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -330,6 +439,9 @@ public class ConcurrentOpenHashSet<V> {
                 Arrays.fill(values, EmptyValue);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -402,9 +514,8 @@ public class ConcurrentOpenHashSet<V> {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             V[] newValues = (V[]) new Object[newCapacity];
 
             // Re-hash table
@@ -418,7 +529,8 @@ public class ConcurrentOpenHashSet<V> {
             values = newValues;
             capacity = newCapacity;
             usedBuckets = size;
-            resizeThreshold = (int) (capacity * MapFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static <V> void insertValueNoLock(V[] values, V value) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
index 95e2302dcb7..e4cb668fc92 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
@@ -79,7 +79,10 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
     @Override
     public boolean add(long item1, long item2) {
         ConcurrentLongPairSet messagesToReplay = longPairSets.computeIfAbsent(item1,
-                (key) -> new ConcurrentLongPairSet(expectedItems, concurrencyLevel));
+                (key) -> ConcurrentLongPairSet.newBuilder()
+                        .expectedItems(expectedItems)
+                        .concurrencyLevel(concurrencyLevel)
+                        .build());
         return messagesToReplay.add(item1, item2);
     }
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
index 82cac712975..a8d3e1d0603 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
@@ -45,21 +45,29 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentLongPairSet(0);
+            ConcurrentLongPairSet.newBuilder()
+                    .expectedItems(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongPairSet(16, 0);
+            ConcurrentLongPairSet.newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongPairSet(4, 8);
+            ConcurrentLongPairSet.newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -68,7 +76,9 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void simpleInsertions() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet(16);
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(16)
+                .build();
 
         assertTrue(set.isEmpty());
         assertTrue(set.add(1, 1));
@@ -94,9 +104,64 @@ public class ConcurrentLongPairSetTest {
         assertEquals(set.size(), 3);
     }
 
+    @Test
+    public void testClear() {
+        ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add(1, 1));
+        assertTrue(map.add(2, 2));
+        assertTrue(map.add(3, 3));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add(1, 1));
+        assertTrue(map.add(2, 2));
+        assertTrue(map.add(3, 3));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove(1, 1));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove(2, 2));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertTrue(map.add(4, 4));
+        assertTrue(map.add(5, 5));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() operation
+        assertTrue(map.add(6, 6));
+        assertTrue(map.remove(6, 6));
+        assertTrue(map.capacity() == 8);
+    }
+
+
     @Test
     public void testRemove() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         assertTrue(set.isEmpty());
         assertTrue(set.add(1, 1));
@@ -111,7 +176,10 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1);
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(set.capacity(), n);
         assertEquals(set.size(), 0);
 
@@ -126,7 +194,10 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testRehashingRemoval() {
         int n = 16;
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1);
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(set.capacity(), n);
         assertEquals(set.size(), 0);
 
@@ -152,7 +223,10 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1);
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(set.capacity(), n);
         assertEquals(set.size(), 0);
 
@@ -177,7 +251,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -210,7 +284,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentLongPairSet map = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder().build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -243,7 +317,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void testIteration() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         assertEquals(set.items(), Collections.emptyList());
 
@@ -269,7 +343,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void testRemoval() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         set.add(0, 0);
         set.add(1, 1);
@@ -295,7 +369,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void testIfRemoval() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         set.add(0, 0);
         set.add(1, 1);
@@ -319,7 +393,7 @@ public class ConcurrentLongPairSetTest {
 
     @Test
     public void testItems() {
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         int n = 100;
         int limit = 10;
@@ -340,7 +414,10 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testHashConflictWithDeletion() {
         final int Buckets = 16;
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet(Buckets, 1);
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+                .expectedItems(Buckets)
+                .concurrencyLevel(1)
+                .build();
 
         // Pick 2 keys that fall into the same bucket
         long key1 = 1;
@@ -375,7 +452,7 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testEqualsObjects() {
 
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         long t1 = 1;
         long t2 = 2;
@@ -397,7 +474,7 @@ public class ConcurrentLongPairSetTest {
     @Test
     public void testToString() {
 
-        ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+        ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
 
         set.add(0, 0);
         set.add(1, 1);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
index 254be51f292..7919485d9b6 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
@@ -49,21 +49,29 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentOpenHashMap<String, String>(0);
+            ConcurrentOpenHashMap.<String, String>newBuilder()
+                    .expectedItems(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentOpenHashMap<String, String>(16, 0);
+            ConcurrentOpenHashMap.<String, String>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentOpenHashMap<String, String>(4, 8);
+            ConcurrentOpenHashMap.<String, String>newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -72,7 +80,10 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void simpleInsertions() {
-        ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16);
+        ConcurrentOpenHashMap<String, String> map =
+                ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(16)
+                .build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put("1", "one"));
@@ -98,9 +109,64 @@ public class ConcurrentOpenHashMapTest {
         assertEquals(map.size(), 3);
     }
 
+    @Test
+    public void testClear() {
+        ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put("k1", "v1"));
+        assertNull(map.put("k2", "v2"));
+        assertNull(map.put("k3", "v3"));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put("k1", "v1"));
+        assertNull(map.put("k2", "v2"));
+        assertNull(map.put("k3", "v3"));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove("k1", "v1"));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove("k2", "v2"));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertNull(map.put("k4", "v4"));
+        assertNull(map.put("k5", "v5"));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() operation
+        assertNull(map.put("k6", "v6"));
+        assertTrue(map.remove("k6", "v6"));
+        assertTrue(map.capacity() == 8);
+    }
+
     @Test
     public void testRemove() {
-        ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<String, String> map =
+                ConcurrentOpenHashMap.<String, String>newBuilder().build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put("1", "one"));
@@ -117,7 +183,10 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentOpenHashMap<String, Integer> map = new ConcurrentOpenHashMap<>(n / 2, 1);
+        ConcurrentOpenHashMap<String, Integer> map = ConcurrentOpenHashMap.<String, Integer>newBuilder()
+                        .expectedItems(n / 2)
+                        .concurrencyLevel(1)
+                        .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -132,7 +201,11 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentOpenHashMap<Integer, Integer> map = new ConcurrentOpenHashMap<>(n / 2, 1);
+        ConcurrentOpenHashMap<Integer, Integer> map =
+                ConcurrentOpenHashMap.<Integer, Integer>newBuilder()
+                        .expectedItems(n / 2)
+                        .concurrencyLevel(1)
+                        .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -154,7 +227,10 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -188,7 +264,8 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<Long, String> map =
+                ConcurrentOpenHashMap.<Long, String>newBuilder().build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -222,7 +299,8 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testIteration() {
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<Long, String> map =
+                ConcurrentOpenHashMap.<Long, String>newBuilder().build();
 
         assertEquals(map.keys(), Collections.emptyList());
         assertEquals(map.values(), Collections.emptyList());
@@ -266,7 +344,10 @@ public class ConcurrentOpenHashMapTest {
     @Test
     public void testHashConflictWithDeletion() {
         final int Buckets = 16;
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(Buckets, 1);
+        ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder()
+                .expectedItems(Buckets)
+                .concurrencyLevel(1)
+                .build();
 
         // Pick 2 keys that fall into the same bucket
         long key1 = 1;
@@ -299,7 +380,8 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testPutIfAbsent() {
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<Long, String> map =
+                ConcurrentOpenHashMap.<Long, String>newBuilder().build();
         assertNull(map.putIfAbsent(1l, "one"));
         assertEquals(map.get(1l), "one");
 
@@ -309,7 +391,10 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testComputeIfAbsent() {
-        ConcurrentOpenHashMap<Integer, Integer> map = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<Integer, Integer> map = ConcurrentOpenHashMap.<Integer, Integer>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         AtomicInteger counter = new AtomicInteger();
         Function<Integer, Integer> provider = key -> counter.getAndIncrement();
 
@@ -350,7 +435,8 @@ public class ConcurrentOpenHashMapTest {
             }
         }
 
-        ConcurrentOpenHashMap<T, String> map = new ConcurrentOpenHashMap<>();
+        ConcurrentOpenHashMap<T, String> map =
+                ConcurrentOpenHashMap.<T, String>newBuilder().build();
 
         T t1 = new T(1);
         T t1_b = new T(1);
@@ -372,7 +458,11 @@ public class ConcurrentOpenHashMapTest {
 
     @Test
     public void testNullValue() {
-        ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16, 1);
+        ConcurrentOpenHashMap<String, String> map =
+                ConcurrentOpenHashMap.<String, String>newBuilder()
+                        .expectedItems(16)
+                        .concurrencyLevel(1)
+                        .build();
         String key = "a";
         assertThrows(NullPointerException.class, () -> map.put(key, null));
 
@@ -406,7 +496,10 @@ public class ConcurrentOpenHashMapTest {
     static final int N = 1_000_000;
 
     public void benchConcurrentOpenHashMap() throws Exception {
-        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(N, 1);
+        ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder()
+                .expectedItems(N)
+                .concurrencyLevel(1)
+                .build();
 
         for (long i = 0; i < Iterations; i++) {
             for (int j = 0; j < N; j++) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
index 3c1d99668d7..af62948b64a 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
@@ -91,9 +91,66 @@ public class ConcurrentOpenHashSetTest {
         assertEquals(set.size(), 3);
     }
 
+    @Test
+    public void testClear() {
+        ConcurrentOpenHashSet<String> map =
+                ConcurrentOpenHashSet.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add("k1"));
+        assertTrue(map.add("k2"));
+        assertTrue(map.add("k3"));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentOpenHashSet<String> map =
+                ConcurrentOpenHashSet.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.add("k1"));
+        assertTrue(map.add("k2"));
+        assertTrue(map.add("k3"));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove("k1"));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove("k2"));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertTrue(map.add("k4"));
+        assertTrue(map.add("k5"));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() operation
+        assertTrue(map.add("k6"));
+        assertTrue(map.remove("k6"));
+        assertTrue(map.capacity() == 8);
+    }
+
     @Test
     public void testRemove() {
-        ConcurrentOpenHashSet<String> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<String> set =
+                ConcurrentOpenHashSet.<String>newBuilder().build();
 
         assertTrue(set.isEmpty());
         assertTrue(set.add("1"));
@@ -145,7 +202,8 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<Long> set =
+                ConcurrentOpenHashSet.<Long>newBuilder().build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -178,7 +236,8 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentOpenHashSet<Long> map = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<Long> map =
+                ConcurrentOpenHashSet.<Long>newBuilder().build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -211,7 +270,7 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void testIteration() {
-        ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<Long> set = ConcurrentOpenHashSet.<Long>newBuilder().build();
 
         assertEquals(set.values(), Collections.emptyList());
 
@@ -237,7 +296,8 @@ public class ConcurrentOpenHashSetTest {
 
     @Test
     public void testRemoval() {
-        ConcurrentOpenHashSet<Integer> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<Integer> set =
+                ConcurrentOpenHashSet.<Integer>newBuilder().build();
 
         set.add(0);
         set.add(1);
@@ -315,7 +375,8 @@ public class ConcurrentOpenHashSetTest {
             }
         }
 
-        ConcurrentOpenHashSet<T> set = new ConcurrentOpenHashSet<>();
+        ConcurrentOpenHashSet<T> set =
+                ConcurrentOpenHashSet.<T>newBuilder().build();
 
         T t1 = new T(1);
         T t1_b = new T(1);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 8e85618f3cc..1ea232203d3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -120,7 +120,8 @@ public class PulsarRecordCursor implements RecordCursor {
 
     PulsarDispatchingRowDecoderFactory decoderFactory;
 
-    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
+    protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap =
+            ConcurrentOpenHashMap.<String, ChunkedMessageCtx>newBuilder().build();
 
     private static final Logger log = Logger.get(PulsarRecordCursor.class);
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index ee607687b15..5a81d9f21a2 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -81,9 +81,17 @@ public class WebSocketService implements Closeable {
     public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {
         this.config = config;
         this.localCluster = localCluster;
-        this.topicProducerMap = new ConcurrentOpenHashMap<>();
-        this.topicConsumerMap = new ConcurrentOpenHashMap<>();
-        this.topicReaderMap = new ConcurrentOpenHashMap<>();
+        this.topicProducerMap =
+                ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashSet<ProducerHandler>>newBuilder()
+                        .build();
+        this.topicConsumerMap =
+                ConcurrentOpenHashMap.<String,
+                        ConcurrentOpenHashSet<ConsumerHandler>>newBuilder()
+                        .build();
+        this.topicReaderMap =
+                ConcurrentOpenHashMap.<String, ConcurrentOpenHashSet<ReaderHandler>>newBuilder()
+                        .build();
         this.proxyStats = new ProxyStats(this);
     }
 
@@ -249,7 +257,8 @@ public class WebSocketService implements Closeable {
 
     public boolean addProducer(ProducerHandler producer) {
         return topicProducerMap
-                .computeIfAbsent(producer.getProducer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
+                .computeIfAbsent(producer.getProducer().getTopic(),
+                        topic -> ConcurrentOpenHashSet.<ProducerHandler>newBuilder().build())
                 .add(producer);
     }
 
@@ -267,7 +276,8 @@ public class WebSocketService implements Closeable {
 
     public boolean addConsumer(ConsumerHandler consumer) {
         return topicConsumerMap
-                .computeIfAbsent(consumer.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
+                .computeIfAbsent(consumer.getConsumer().getTopic(), topic ->
+                        ConcurrentOpenHashSet.<ConsumerHandler>newBuilder().build())
                 .add(consumer);
     }
 
@@ -284,7 +294,8 @@ public class WebSocketService implements Closeable {
     }
 
     public boolean addReader(ReaderHandler reader) {
-        return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
+        return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic ->
+                ConcurrentOpenHashSet.<ReaderHandler>newBuilder().build())
                 .add(reader);
     }
 
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java
index cc327f57191..7fd75c9b3d9 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java
@@ -48,7 +48,9 @@ public class ProxyStats {
         super();
         this.service = service;
         this.jvmMetrics = new JvmMetrics(service);
-        this.topicStats = new ConcurrentOpenHashMap<>();
+        this.topicStats =
+                ConcurrentOpenHashMap.<String, ProxyNamespaceStats>newBuilder()
+                        .build();
         this.metricsCollection = new ArrayList<>();
         this.tempMetricsCollection = new ArrayList<>();
         // schedule stat generation task every 1 minute