You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 14:11:15 UTC
[pulsar] 04/15: support shrink for map or set (#14663)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7ca9bf818deafa263c57329a99efed38644e44e0
Author: LinChen <15...@qq.com>
AuthorDate: Mon Mar 14 23:23:47 2022 +0800
support shrink for map or set (#14663)
* support shrink for map or set
* check style
* check style
(cherry picked from commit 1d10dff757ac7b9a203c14d2085a480495fb141b)
---
.../mledger/impl/ManagedLedgerOfflineBacklog.java | 3 +-
.../broker/loadbalance/impl/LoadManagerShared.java | 20 ++-
.../loadbalance/impl/ModularLoadManagerImpl.java | 18 ++-
.../loadbalance/impl/SimpleLoadManagerImpl.java | 18 ++-
.../pulsar/broker/namespace/NamespaceService.java | 13 +-
.../pulsar/broker/service/BrokerService.java | 39 ++++--
.../service/nonpersistent/NonPersistentTopic.java | 13 +-
.../service/persistent/MessageDeduplication.java | 12 +-
.../broker/service/persistent/PersistentTopic.java | 20 ++-
.../broker/stats/ClusterReplicationMetrics.java | 3 +-
.../AntiAffinityNamespaceGroupTest.java | 15 ++-
.../loadbalance/impl/LoadManagerSharedTest.java | 13 +-
.../pulsar/broker/service/PersistentTopicTest.java | 24 +++-
.../apache/pulsar/client/impl/ConsumerBase.java | 3 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 3 +-
.../apache/pulsar/client/impl/ProducerBase.java | 3 +-
.../impl/AcknowledgementsGroupingTrackerTest.java | 3 +-
.../util/collections/ConcurrentLongPairSet.java | 148 +++++++++++++++++++--
.../util/collections/ConcurrentOpenHashMap.java | 140 +++++++++++++++++--
.../util/collections/ConcurrentOpenHashSet.java | 140 +++++++++++++++++--
.../collections/ConcurrentSortedLongPairSet.java | 5 +-
.../collections/ConcurrentLongPairSetTest.java | 111 +++++++++++++---
.../collections/ConcurrentOpenHashMapTest.java | 125 ++++++++++++++---
.../collections/ConcurrentOpenHashSetTest.java | 73 +++++++++-
.../apache/pulsar/websocket/WebSocketService.java | 23 +++-
.../apache/pulsar/websocket/stats/ProxyStats.java | 4 +-
26 files changed, 853 insertions(+), 139 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
index e00dd47a739..d258c1ca339 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java
@@ -220,7 +220,8 @@ public class ManagedLedgerOfflineBacklog {
BookKeeper bk = factory.getBookKeeper();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
final long errorInReadingCursor = -1;
- ConcurrentOpenHashMap<String, Long> ledgerRetryMap = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<String, Long> ledgerRetryMap =
+ ConcurrentOpenHashMap.<String, Long>newBuilder().build();
final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
final PositionImpl lastLedgerPosition = new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 2a217049a81..5c969e337b1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -194,7 +194,9 @@ public class LoadManagerShared {
bundles.forEach(bundleName -> {
final String namespaceName = getNamespaceNameFromBundleName(bundleName);
final String bundleRange = getBundleRangeFromBundleName(bundleName);
- target.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
+ target.computeIfAbsent(namespaceName,
+ k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+ .add(bundleRange);
});
}
@@ -267,8 +269,12 @@ public class LoadManagerShared {
for (final String broker : candidates) {
int bundles = (int) brokerToNamespaceToBundleRange
- .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
- .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size();
+ .computeIfAbsent(broker,
+ k -> ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashSet<String>>newBuilder().build())
+ .computeIfAbsent(namespaceName,
+ k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+ .size();
leastBundles = Math.min(leastBundles, bundles);
if (leastBundles == 0) {
break;
@@ -280,8 +286,12 @@ public class LoadManagerShared {
final int finalLeastBundles = leastBundles;
candidates.removeIf(
- broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
- .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).size() > finalLeastBundles);
+ broker -> brokerToNamespaceToBundleRange.computeIfAbsent(broker,
+ k -> ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashSet<String>>newBuilder().build())
+ .computeIfAbsent(namespaceName,
+ k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+ .size() > finalLeastBundles);
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 9349e68f9e3..9c7c73e975d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -201,7 +201,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Noti
*/
public ModularLoadManagerImpl() {
brokerCandidateCache = new HashSet<>();
- brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+ brokerToNamespaceToBundleRange =
+ ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
+ .build();
defaultStats = new NamespaceBundleStats();
filterPipeline = new ArrayList<>();
loadData = new LoadData();
@@ -543,7 +546,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Noti
brokerData.getTimeAverageData().reset(statsMap.keySet(), bundleData, defaultStats);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
- .computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>());
+ .computeIfAbsent(broker, k ->
+ ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashSet<String>>newBuilder()
+ .build());
synchronized (namespaceToBundleRange) {
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(statsMap.keySet(), namespaceToBundleRange);
@@ -826,9 +832,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Noti
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
- .computeIfAbsent(broker.get(), k -> new ConcurrentOpenHashMap<>());
+ .computeIfAbsent(broker.get(),
+ k -> ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashSet<String>>newBuilder()
+ .build());
synchronized (namespaceToBundleRange) {
- namespaceToBundleRange.computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>())
+ namespaceToBundleRange.computeIfAbsent(namespaceName,
+ k -> ConcurrentOpenHashSet.<String>newBuilder().build())
.add(bundleRange);
}
return broker;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index b14a018b0ec..6fa5e0da5e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -196,7 +196,10 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
bundleLossesCache = new HashSet<>();
brokerCandidateCache = new HashSet<>();
availableBrokersCache = new HashSet<>();
- brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+ brokerToNamespaceToBundleRange =
+ ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
+ .build();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
@@ -834,8 +837,12 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
// same broker.
brokerToNamespaceToBundleRange
.computeIfAbsent(selectedRU.getResourceId().replace("http://", ""),
- k -> new ConcurrentOpenHashMap<>())
- .computeIfAbsent(namespaceName, k -> new ConcurrentOpenHashSet<>()).add(bundleRange);
+ k -> ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashSet<String>>newBuilder()
+ .build())
+ .computeIfAbsent(namespaceName, k ->
+ ConcurrentOpenHashSet.<String>newBuilder().build())
+ .add(bundleRange);
ranking.addPreAllocatedServiceUnit(serviceUnitId, quota);
resourceUnitRankings.put(selectedRU, ranking);
}
@@ -1252,7 +1259,10 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
final Set<String> preallocatedBundles = resourceUnitRankings.get(resourceUnit).getPreAllocatedBundles();
final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> namespaceToBundleRange =
brokerToNamespaceToBundleRange
- .computeIfAbsent(broker.replace("http://", ""), k -> new ConcurrentOpenHashMap<>());
+ .computeIfAbsent(broker.replace("http://", ""),
+ k -> ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashSet<String>>newBuilder()
+ .build());
namespaceToBundleRange.clear();
LoadManagerShared.fillNamespaceToBundlesMap(loadedBundles, namespaceToBundleRange);
LoadManagerShared.fillNamespaceToBundlesMap(preallocatedBundles, namespaceToBundleRange);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index d2c87be9581..36c5cc31c11 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -175,7 +175,8 @@ public class NamespaceService implements AutoCloseable {
this.loadManager = pulsar.getLoadManager();
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory, this);
- this.namespaceClients = new ConcurrentOpenHashMap<>();
+ this.namespaceClients =
+ ConcurrentOpenHashMap.<ClusterDataImpl, PulsarClientImpl>newBuilder().build();
this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.localPoliciesCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalPolicies.class);
@@ -363,9 +364,15 @@ public class NamespaceService implements AutoCloseable {
}
private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
- findingBundlesAuthoritative = new ConcurrentOpenHashMap<>();
+ findingBundlesAuthoritative =
+ ConcurrentOpenHashMap.<NamespaceBundle,
+ CompletableFuture<Optional<LookupResult>>>newBuilder()
+ .build();
private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>>
- findingBundlesNotAuthoritative = new ConcurrentOpenHashMap<>();
+ findingBundlesNotAuthoritative =
+ ConcurrentOpenHashMap.<NamespaceBundle,
+ CompletableFuture<Optional<LookupResult>>>newBuilder()
+ .build();
/**
* Main internal method to lookup and setup ownership of service unit to a broker.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0003121a077..9650c101cd8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -279,16 +279,26 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
- this.topics = new ConcurrentOpenHashMap<>();
- this.replicationClients = new ConcurrentOpenHashMap<>();
- this.clusterAdmins = new ConcurrentOpenHashMap<>();
+ this.topics =
+ ConcurrentOpenHashMap.<String, CompletableFuture<Optional<Topic>>>newBuilder()
+ .build();
+ this.replicationClients =
+ ConcurrentOpenHashMap.<String, PulsarClient>newBuilder().build();
+ this.clusterAdmins =
+ ConcurrentOpenHashMap.<String, PulsarAdmin>newBuilder().build();
this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds();
- this.configRegisteredListeners = new ConcurrentOpenHashMap<>();
+ this.configRegisteredListeners =
+ ConcurrentOpenHashMap.<String, Consumer<?>>newBuilder().build();
this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();
- this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>();
+ this.multiLayerTopicsMap = ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>>newBuilder()
+ .build();
+
this.pulsarStats = new PulsarStats(pulsar);
- this.offlineTopicStatCache = new ConcurrentOpenHashMap<>();
+ this.offlineTopicStatCache =
+ ConcurrentOpenHashMap.<TopicName,
+ PersistentOfflineTopicStats>newBuilder().build();
this.topicOrderedExecutor = OrderedScheduler.newSchedulerBuilder()
.numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic())
@@ -327,7 +337,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class);
}
};
- this.blockedDispatchers = new ConcurrentOpenHashSet<>();
+ this.blockedDispatchers =
+ ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
@@ -1530,8 +1541,12 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
synchronized (multiLayerTopicsMap) {
String serviceUnit = namespaceBundle.toString();
multiLayerTopicsMap //
- .computeIfAbsent(topicName.getNamespace(), k -> new ConcurrentOpenHashMap<>()) //
- .computeIfAbsent(serviceUnit, k -> new ConcurrentOpenHashMap<>()) //
+ .computeIfAbsent(topicName.getNamespace(),
+ k -> ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashMap<String, Topic>>newBuilder()
+ .build()) //
+ .computeIfAbsent(serviceUnit,
+ k -> ConcurrentOpenHashMap.<String, Topic>newBuilder().build()) //
.put(topicName.toString(), topic);
}
}
@@ -2304,7 +2319,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
- ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap =
+ ConcurrentOpenHashMap.<String, ConfigField>newBuilder().build();
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
@@ -2317,7 +2333,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
private ConcurrentOpenHashMap<String, Object> getRuntimeConfigurationMap() {
- ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap =
+ ConcurrentOpenHashMap.<String, Object>newBuilder().build();
for (Field field : ServiceConfiguration.class.getDeclaredFields()) {
if (field != null && field.isAnnotationPresent(FieldContext.class)) {
field.setAccessible(true);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 02002b13273..50ac27ff352 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -136,8 +136,17 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
public NonPersistentTopic(String topic, BrokerService brokerService) {
super(topic, brokerService);
- this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
- this.replicators = new ConcurrentOpenHashMap<>(16, 1);
+
+ this.subscriptions =
+ ConcurrentOpenHashMap.<String, NonPersistentSubscription>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
+ this.replicators =
+ ConcurrentOpenHashMap.<String, NonPersistentReplicator>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
this.isFenced = false;
try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 613c5a3116b..ff44fa1ed7f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -104,12 +104,20 @@ public class MessageDeduplication {
// Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before
// the messages are persisted
@VisibleForTesting
- final ConcurrentOpenHashMap<String, Long> highestSequencedPushed = new ConcurrentOpenHashMap<>(16, 1);
+ final ConcurrentOpenHashMap<String, Long> highestSequencedPushed =
+ ConcurrentOpenHashMap.<String, Long>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
// Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated
// after the messages are persisted
@VisibleForTesting
- final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted = new ConcurrentOpenHashMap<>(16, 1);
+ final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted =
+ ConcurrentOpenHashMap.<String, Long>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
// Number of persisted entries after which to store a snapshot of the sequence ids map
private final int snapshotInterval;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 977f74387ab..c93eda3aada 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -257,8 +257,14 @@ public class PersistentTopic extends AbstractTopic
public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws NamingException {
super(topic, brokerService);
this.ledger = ledger;
- this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
- this.replicators = new ConcurrentOpenHashMap<>(16, 1);
+ this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
+ this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
this.delayedDeliveryEnabled = brokerService.pulsar().getConfiguration().isDelayedDeliveryEnabled();
this.delayedDeliveryTickTimeMillis =
brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis();
@@ -346,8 +352,14 @@ public class PersistentTopic extends AbstractTopic
super(topic, brokerService);
this.ledger = ledger;
this.messageDeduplication = messageDeduplication;
- this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
- this.replicators = new ConcurrentOpenHashMap<>(16, 1);
+ this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
+ this.replicators = ConcurrentOpenHashMap.<String, Replicator>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java
index 1086563085b..6718f074c67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ClusterReplicationMetrics.java
@@ -35,7 +35,8 @@ public class ClusterReplicationMetrics {
public ClusterReplicationMetrics(String localCluster, boolean metricsEnabled) {
metricsList = new ArrayList<>();
this.localCluster = localCluster;
- metricsMap = new ConcurrentOpenHashMap<>();
+ metricsMap = ConcurrentOpenHashMap.<String, ReplicationMetrics>newBuilder()
+ .build();
this.metricsEnabled = metricsEnabled;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index cca562db4e0..5a92b739a0c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -234,7 +234,8 @@ public class AntiAffinityNamespaceGroupTest {
brokerToDomainMap.put("brokerName-3", "domain-1");
Set<String> candidate = Sets.newHashSet();
- ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange =
+ ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build();
assertEquals(brokers.size(), totalBrokers);
@@ -320,7 +321,8 @@ public class AntiAffinityNamespaceGroupTest {
Set<String> brokers = Sets.newHashSet();
Set<String> candidate = Sets.newHashSet();
- ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange =
+ ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build();
brokers.add("broker-0");
brokers.add("broker-1");
brokers.add("broker-2");
@@ -367,9 +369,11 @@ public class AntiAffinityNamespaceGroupTest {
private void selectBrokerForNamespace(
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange,
String broker, String namespace, String assignedBundleName) {
- ConcurrentOpenHashSet<String> bundleSet = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<String> bundleSet =
+ ConcurrentOpenHashSet.<String>newBuilder().build();
bundleSet.add(assignedBundleName);
- ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> nsToBundleMap = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>> nsToBundleMap =
+ ConcurrentOpenHashMap.<String, ConcurrentOpenHashSet<String>>newBuilder().build();
nsToBundleMap.put(namespace, bundleSet);
brokerToNamespaceToBundleRange.put(broker, nsToBundleMap);
}
@@ -475,7 +479,8 @@ public class AntiAffinityNamespaceGroupTest {
Set<String> brokers = Sets.newHashSet();
Set<String> candidate = Sets.newHashSet();
- ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange =
+ ConcurrentOpenHashMap.<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder().build();
brokers.add("broker-0");
brokers.add("broker-1");
brokers.add("broker-2");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
index 716b9716425..d23772185f1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerSharedTest.java
@@ -36,7 +36,10 @@ public class LoadManagerSharedTest {
String assignedBundle = namespace + "/0x00000000_0x40000000";
Set<String> candidates = Sets.newHashSet();
- ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map =
+ ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>>newBuilder()
+ .build();
LoadManagerShared.removeMostServicingBrokersForNamespace(assignedBundle, candidates, map);
Assert.assertEquals(candidates.size(), 0);
@@ -80,8 +83,12 @@ public class LoadManagerSharedTest {
private static void fillBrokerToNamespaceToBundleMap(
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> map,
String broker, String namespace, String bundle) {
- map.computeIfAbsent(broker, k -> new ConcurrentOpenHashMap<>())
- .computeIfAbsent(namespace, k -> new ConcurrentOpenHashSet<>()).add(bundle);
+ map.computeIfAbsent(broker,
+ k -> ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashSet<String>>newBuilder().build())
+ .computeIfAbsent(namespace,
+ k -> ConcurrentOpenHashSet.<String>newBuilder().build())
+ .add(bundle);
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 5afbdafff2b..3f530a53804 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -848,7 +848,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
addConsumerToSubscription.setAccessible(true);
// for count consumers on topic
- ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+ ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+ ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
subscriptions.put("sub-1", sub);
subscriptions.put("sub-2", sub2);
Field field = topic.getClass().getDeclaredField("subscriptions");
@@ -943,7 +947,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
addConsumerToSubscription.setAccessible(true);
// for count consumers on topic
- ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+ ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+ ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
subscriptions.put("sub-1", sub);
subscriptions.put("sub-2", sub2);
Field field = topic.getClass().getDeclaredField("subscriptions");
@@ -1064,7 +1072,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
addConsumerToSubscription.setAccessible(true);
// for count consumers on topic
- ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+ ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+ ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
subscriptions.put("sub1", sub1);
subscriptions.put("sub2", sub2);
Field field = topic.getClass().getDeclaredField("subscriptions");
@@ -2082,7 +2094,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
public void testCheckInactiveSubscriptions() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
- ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
+ ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+ ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
// This subscription is connected by consumer.
PersistentSubscription nonDeletableSubscription1 = spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "nonDeletableSubscription1", cursorMock, false);
subscriptions.put(nonDeletableSubscription1.getName(), nonDeletableSubscription1);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 09b840fa502..f3d02343d2b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -99,7 +99,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
this.consumerEventListener = conf.getConsumerEventListener();
// Always use growable queue since items can exceed the advertised size
this.incomingMessages = new GrowableArrayBlockingQueue<>();
- this.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
+ this.unAckedChunkedMessageIdSequenceMap =
+ ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build();
this.executorProvider = executorProvider;
this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 0d5e63a1e52..a0491f86b96 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -174,7 +174,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
protected volatile boolean paused;
- protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
+ protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap =
+ ConcurrentOpenHashMap.<String, ChunkedMessageCtx>newBuilder().build();
private int pendingChunkedMessageCount = 0;
protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
private boolean expireChunkMessageTaskScheduled = false;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 51450e16ce4..0ef8550a7df 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -52,7 +52,8 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
this.conf = conf;
this.schema = schema;
this.interceptors = interceptors;
- this.schemaCache = new ConcurrentOpenHashMap<>();
+ this.schemaCache =
+ ConcurrentOpenHashMap.<SchemaHash, byte[]>newBuilder().build();
if (!conf.isMultiSchema()) {
multiSchemaMode = MultiSchemaMode.Disabled;
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
index c0b952a281a..d577f48357c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java
@@ -60,7 +60,8 @@ public class AcknowledgementsGroupingTrackerTest {
public void setup() throws NoSuchFieldException, IllegalAccessException {
eventLoopGroup = new NioEventLoopGroup(1);
consumer = mock(ConsumerImpl.class);
- consumer.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
+ consumer.unAckedChunkedMessageIdSequenceMap =
+ ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build();
cnx = spy(new ClientCnxTest(new ClientConfigurationData(), new NioEventLoopGroup()));
PulsarClientImpl client = mock(PulsarClientImpl.class);
doReturn(client).when(consumer).getClient();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index f1806c511e2..abbe11576a9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -45,8 +45,74 @@ public class ConcurrentLongPairSet implements LongPairSet {
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
+ private static final float DefaultMapFillFactor = 0.66f;
+ private static final float DefaultMapIdleFactor = 0.15f;
+
+ private static final float DefaultExpandFactor = 2;
+ private static final float DefaultShrinkFactor = 2;
+
+ private static final boolean DefaultAutoShrink = false;
+
private final Section[] sections;
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder of ConcurrentLongPairSet.
+ */
+ public static class Builder {
+ int expectedItems = DefaultExpectedItems;
+ int concurrencyLevel = DefaultConcurrencyLevel;
+ float mapFillFactor = DefaultMapFillFactor;
+ float mapIdleFactor = DefaultMapIdleFactor;
+ float expandFactor = DefaultExpandFactor;
+ float shrinkFactor = DefaultShrinkFactor;
+ boolean autoShrink = DefaultAutoShrink;
+
+ public Builder expectedItems(int expectedItems) {
+ this.expectedItems = expectedItems;
+ return this;
+ }
+
+ public Builder concurrencyLevel(int concurrencyLevel) {
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ public Builder mapFillFactor(float mapFillFactor) {
+ this.mapFillFactor = mapFillFactor;
+ return this;
+ }
+
+ public Builder mapIdleFactor(float mapIdleFactor) {
+ this.mapIdleFactor = mapIdleFactor;
+ return this;
+ }
+
+ public Builder expandFactor(float expandFactor) {
+ this.expandFactor = expandFactor;
+ return this;
+ }
+
+ public Builder shrinkFactor(float shrinkFactor) {
+ this.shrinkFactor = shrinkFactor;
+ return this;
+ }
+
+ public Builder autoShrink(boolean autoShrink) {
+ this.autoShrink = autoShrink;
+ return this;
+ }
+
+ public ConcurrentLongPairSet build() {
+ return new ConcurrentLongPairSet(expectedItems, concurrencyLevel,
+ mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+ }
+ }
+
+
/**
* Represents a function that accepts an object of the {@code LongPair} type.
*/
@@ -61,18 +127,33 @@ public class ConcurrentLongPairSet implements LongPairSet {
void accept(long v1, long v2);
}
+ @Deprecated
public ConcurrentLongPairSet() {
this(DefaultExpectedItems);
}
+ @Deprecated
public ConcurrentLongPairSet(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
+ @Deprecated
public ConcurrentLongPairSet(int expectedItems, int concurrencyLevel) {
+ this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+ DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+ }
+
+ public ConcurrentLongPairSet(int expectedItems, int concurrencyLevel,
+ float mapFillFactor, float mapIdleFactor,
+ boolean autoShrink, float expandFactor, float shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
+ checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+ checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+ checkArgument(mapFillFactor > mapIdleFactor);
+ checkArgument(expandFactor > 1);
+ checkArgument(shrinkFactor > 1);
int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
@@ -80,10 +161,12 @@ public class ConcurrentLongPairSet implements LongPairSet {
this.sections = new Section[numSections];
for (int i = 0; i < numSections; i++) {
- sections[i] = new Section(perSectionCapacity);
+ sections[i] = new Section(perSectionCapacity, mapFillFactor, mapIdleFactor,
+ autoShrink, expandFactor, shrinkFactor);
}
}
+ @Override
public long size() {
long size = 0;
for (int i = 0; i < sections.length; i++) {
@@ -214,18 +297,33 @@ public class ConcurrentLongPairSet implements LongPairSet {
private volatile long[] table;
private volatile int capacity;
+ private final int initCapacity;
private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(Section.class, "size");
private volatile int size;
private int usedBuckets;
- private int resizeThreshold;
-
- Section(int capacity) {
+ private int resizeThresholdUp;
+ private int resizeThresholdBelow;
+ private final float mapFillFactor;
+ private final float mapIdleFactor;
+ private final float expandFactor;
+ private final float shrinkFactor;
+ private final boolean autoShrink;
+
+ Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+ float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
+ this.initCapacity = this.capacity;
this.table = new long[2 * this.capacity];
this.size = 0;
this.usedBuckets = 0;
- this.resizeThreshold = (int) (this.capacity * SetFillFactor);
+ this.autoShrink = autoShrink;
+ this.mapFillFactor = mapFillFactor;
+ this.mapIdleFactor = mapIdleFactor;
+ this.expandFactor = expandFactor;
+ this.shrinkFactor = shrinkFactor;
+ this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+ this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
Arrays.fill(table, EmptyItem);
}
@@ -314,9 +412,11 @@ public class ConcurrentLongPairSet implements LongPairSet {
bucket = (bucket + 2) & (table.length - 1);
}
} finally {
- if (usedBuckets > resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ // Expand the hashmap
+ int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -347,7 +447,20 @@ public class ConcurrentLongPairSet implements LongPairSet {
bucket = (bucket + 2) & (table.length - 1);
}
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp > size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -379,6 +492,16 @@ public class ConcurrentLongPairSet implements LongPairSet {
table[bucket] = EmptyItem;
table[bucket + 1] = EmptyItem;
--usedBuckets;
+
+ // Cleanup all the buckets that were in `DeletedKey` state,
+ // so that we can reduce unnecessary expansions
+ bucket = (bucket - 1) & (table.length - 1);
+ while (table[bucket] == DeletedItem) {
+ table[bucket] = EmptyItem;
+ --usedBuckets;
+
+ bucket = (bucket - 1) & (table.length - 1);
+ }
} else {
table[bucket] = DeletedItem;
table[bucket + 1] = DeletedItem;
@@ -392,6 +515,9 @@ public class ConcurrentLongPairSet implements LongPairSet {
Arrays.fill(table, EmptyItem);
this.size = 0;
this.usedBuckets = 0;
+ if (autoShrink) {
+ rehash(initCapacity);
+ }
} finally {
unlockWrite(stamp);
}
@@ -431,9 +557,8 @@ public class ConcurrentLongPairSet implements LongPairSet {
}
}
- private void rehash() {
+ private void rehash(int newCapacity) {
// Expand the hashmap
- int newCapacity = capacity * 2;
long[] newTable = new long[2 * newCapacity];
Arrays.fill(newTable, EmptyItem);
@@ -451,7 +576,8 @@ public class ConcurrentLongPairSet implements LongPairSet {
// Capacity needs to be updated after the values, so that we won't see
// a capacity value bigger than the actual array size
capacity = newCapacity;
- resizeThreshold = (int) (capacity * SetFillFactor);
+ resizeThresholdUp = (int) (capacity * mapFillFactor);
+ resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}
private static void insertKeyValueNoLock(long[] table, int capacity, long item1, long item2) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 2c7eed1b58e..255844cf4ba 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -64,33 +64,112 @@ public class ConcurrentOpenHashMap<K, V> {
}
};
- private static final float MapFillFactor = 0.66f;
-
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
+ private static final float DefaultMapFillFactor = 0.66f;
+ private static final float DefaultMapIdleFactor = 0.15f;
+
+ private static final float DefaultExpandFactor = 2;
+ private static final float DefaultShrinkFactor = 2;
+
+ private static final boolean DefaultAutoShrink = false;
+
private final Section<K, V>[] sections;
+ public static <K, V> Builder<K, V> newBuilder() {
+ return new Builder<>();
+ }
+
+ /**
+ * Builder of ConcurrentOpenHashMap.
+ */
+ public static class Builder<K, V> {
+ int expectedItems = DefaultExpectedItems;
+ int concurrencyLevel = DefaultConcurrencyLevel;
+ float mapFillFactor = DefaultMapFillFactor;
+ float mapIdleFactor = DefaultMapIdleFactor;
+ float expandFactor = DefaultExpandFactor;
+ float shrinkFactor = DefaultShrinkFactor;
+ boolean autoShrink = DefaultAutoShrink;
+
+ public Builder<K, V> expectedItems(int expectedItems) {
+ this.expectedItems = expectedItems;
+ return this;
+ }
+
+ public Builder<K, V> concurrencyLevel(int concurrencyLevel) {
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ public Builder<K, V> mapFillFactor(float mapFillFactor) {
+ this.mapFillFactor = mapFillFactor;
+ return this;
+ }
+
+ public Builder<K, V> mapIdleFactor(float mapIdleFactor) {
+ this.mapIdleFactor = mapIdleFactor;
+ return this;
+ }
+
+ public Builder<K, V> expandFactor(float expandFactor) {
+ this.expandFactor = expandFactor;
+ return this;
+ }
+
+ public Builder<K, V> shrinkFactor(float shrinkFactor) {
+ this.shrinkFactor = shrinkFactor;
+ return this;
+ }
+
+ public Builder<K, V> autoShrink(boolean autoShrink) {
+ this.autoShrink = autoShrink;
+ return this;
+ }
+
+ public ConcurrentOpenHashMap<K, V> build() {
+ return new ConcurrentOpenHashMap<>(expectedItems, concurrencyLevel,
+ mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+ }
+ }
+
+ @Deprecated
public ConcurrentOpenHashMap() {
this(DefaultExpectedItems);
}
+ @Deprecated
public ConcurrentOpenHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
+ @Deprecated
public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel) {
+ this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+ DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+ }
+
+ public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel,
+ float mapFillFactor, float mapIdleFactor,
+ boolean autoShrink, float expandFactor, float shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
+ checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+ checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+ checkArgument(mapFillFactor > mapIdleFactor);
+ checkArgument(expandFactor > 1);
+ checkArgument(shrinkFactor > 1);
int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
- int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+ int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
this.sections = (Section<K, V>[]) new Section[numSections];
for (int i = 0; i < numSections; i++) {
- sections[i] = new Section<>(perSectionCapacity);
+ sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
+ autoShrink, expandFactor, shrinkFactor);
}
}
@@ -208,18 +287,33 @@ public class ConcurrentOpenHashMap<K, V> {
private volatile Object[] table;
private volatile int capacity;
+ private final int initCapacity;
private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
private volatile int size;
private int usedBuckets;
- private int resizeThreshold;
-
- Section(int capacity) {
+ private int resizeThresholdUp;
+ private int resizeThresholdBelow;
+ private final float mapFillFactor;
+ private final float mapIdleFactor;
+ private final float expandFactor;
+ private final float shrinkFactor;
+ private final boolean autoShrink;
+
+ Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+ float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
+ this.initCapacity = this.capacity;
this.table = new Object[2 * this.capacity];
this.size = 0;
this.usedBuckets = 0;
- this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+ this.autoShrink = autoShrink;
+ this.mapFillFactor = mapFillFactor;
+ this.mapIdleFactor = mapIdleFactor;
+ this.expandFactor = expandFactor;
+ this.shrinkFactor = shrinkFactor;
+ this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+ this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
}
V get(K key, int keyHash) {
@@ -316,9 +410,11 @@ public class ConcurrentOpenHashMap<K, V> {
bucket = (bucket + 2) & (table.length - 1);
}
} finally {
- if (usedBuckets > resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ // Expand the hashmap
+ int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -363,7 +459,20 @@ public class ConcurrentOpenHashMap<K, V> {
}
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp > size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -374,6 +483,9 @@ public class ConcurrentOpenHashMap<K, V> {
Arrays.fill(table, EmptyKey);
this.size = 0;
this.usedBuckets = 0;
+ if (autoShrink) {
+ rehash(initCapacity);
+ }
} finally {
unlockWrite(stamp);
}
@@ -415,9 +527,8 @@ public class ConcurrentOpenHashMap<K, V> {
}
}
- private void rehash() {
+ private void rehash(int newCapacity) {
// Expand the hashmap
- int newCapacity = capacity * 2;
Object[] newTable = new Object[2 * newCapacity];
// Re-hash table
@@ -432,7 +543,8 @@ public class ConcurrentOpenHashMap<K, V> {
table = newTable;
capacity = newCapacity;
usedBuckets = size;
- resizeThreshold = (int) (capacity * MapFillFactor);
+ resizeThresholdUp = (int) (capacity * mapFillFactor);
+ resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}
private static <K, V> void insertKeyValueNoLock(Object[] table, int capacity, K key, V value) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 8b77d9052b3..6dc8552174e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -43,33 +43,112 @@ public class ConcurrentOpenHashSet<V> {
private static final Object EmptyValue = null;
private static final Object DeletedValue = new Object();
- private static final float MapFillFactor = 0.66f;
-
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
+ private static final float DefaultMapFillFactor = 0.66f;
+ private static final float DefaultMapIdleFactor = 0.15f;
+
+ private static final float DefaultExpandFactor = 2;
+ private static final float DefaultShrinkFactor = 2;
+
+ private static final boolean DefaultAutoShrink = false;
+
private final Section<V>[] sections;
+ public static <V> Builder<V> newBuilder() {
+ return new Builder<>();
+ }
+
+ /**
+ * Builder of ConcurrentOpenHashSet.
+ */
+ public static class Builder<V> {
+ int expectedItems = DefaultExpectedItems;
+ int concurrencyLevel = DefaultConcurrencyLevel;
+ float mapFillFactor = DefaultMapFillFactor;
+ float mapIdleFactor = DefaultMapIdleFactor;
+ float expandFactor = DefaultExpandFactor;
+ float shrinkFactor = DefaultShrinkFactor;
+ boolean autoShrink = DefaultAutoShrink;
+
+ public Builder<V> expectedItems(int expectedItems) {
+ this.expectedItems = expectedItems;
+ return this;
+ }
+
+ public Builder<V> concurrencyLevel(int concurrencyLevel) {
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ public Builder<V> mapFillFactor(float mapFillFactor) {
+ this.mapFillFactor = mapFillFactor;
+ return this;
+ }
+
+ public Builder<V> mapIdleFactor(float mapIdleFactor) {
+ this.mapIdleFactor = mapIdleFactor;
+ return this;
+ }
+
+ public Builder<V> expandFactor(float expandFactor) {
+ this.expandFactor = expandFactor;
+ return this;
+ }
+
+ public Builder<V> shrinkFactor(float shrinkFactor) {
+ this.shrinkFactor = shrinkFactor;
+ return this;
+ }
+
+ public Builder<V> autoShrink(boolean autoShrink) {
+ this.autoShrink = autoShrink;
+ return this;
+ }
+
+ public ConcurrentOpenHashSet<V> build() {
+ return new ConcurrentOpenHashSet<>(expectedItems, concurrencyLevel,
+ mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+ }
+ }
+
+ @Deprecated
public ConcurrentOpenHashSet() {
this(DefaultExpectedItems);
}
+ @Deprecated
public ConcurrentOpenHashSet(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
+ @Deprecated
public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel) {
+ this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+ DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+ }
+
+ public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel,
+ float mapFillFactor, float mapIdleFactor,
+ boolean autoShrink, float expandFactor, float shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
+ checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+ checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+ checkArgument(mapFillFactor > mapIdleFactor);
+ checkArgument(expandFactor > 1);
+ checkArgument(shrinkFactor > 1);
int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
- int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+ int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
this.sections = (Section<V>[]) new Section[numSections];
for (int i = 0; i < numSections; i++) {
- sections[i] = new Section<>(perSectionCapacity);
+ sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
+ autoShrink, expandFactor, shrinkFactor);
}
}
@@ -177,18 +256,33 @@ public class ConcurrentOpenHashSet<V> {
private volatile V[] values;
private volatile int capacity;
+ private final int initCapacity;
private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
private volatile int size;
private int usedBuckets;
- private int resizeThreshold;
-
- Section(int capacity) {
+ private int resizeThresholdUp;
+ private int resizeThresholdBelow;
+ private final float mapFillFactor;
+ private final float mapIdleFactor;
+ private final float expandFactor;
+ private final float shrinkFactor;
+ private final boolean autoShrink;
+
+ Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+ float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
+ this.initCapacity = this.capacity;
this.values = (V[]) new Object[this.capacity];
this.size = 0;
this.usedBuckets = 0;
- this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+ this.autoShrink = autoShrink;
+ this.mapFillFactor = mapFillFactor;
+ this.mapIdleFactor = mapIdleFactor;
+ this.expandFactor = expandFactor;
+ this.shrinkFactor = shrinkFactor;
+ this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+ this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
}
boolean contains(V value, int keyHash) {
@@ -284,9 +378,11 @@ public class ConcurrentOpenHashSet<V> {
++bucket;
}
} finally {
- if (usedBuckets > resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ // Expand the hashmap
+ int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -319,7 +415,20 @@ public class ConcurrentOpenHashSet<V> {
}
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp > size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -330,6 +439,9 @@ public class ConcurrentOpenHashSet<V> {
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
+ if (autoShrink) {
+ rehash(initCapacity);
+ }
} finally {
unlockWrite(stamp);
}
@@ -402,9 +514,8 @@ public class ConcurrentOpenHashSet<V> {
}
}
- private void rehash() {
+ private void rehash(int newCapacity) {
// Expand the hashmap
- int newCapacity = capacity * 2;
V[] newValues = (V[]) new Object[newCapacity];
// Re-hash table
@@ -418,7 +529,8 @@ public class ConcurrentOpenHashSet<V> {
values = newValues;
capacity = newCapacity;
usedBuckets = size;
- resizeThreshold = (int) (capacity * MapFillFactor);
+ resizeThresholdUp = (int) (capacity * mapFillFactor);
+ resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}
private static <V> void insertValueNoLock(V[] values, V value) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
index 95e2302dcb7..e4cb668fc92 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java
@@ -79,7 +79,10 @@ public class ConcurrentSortedLongPairSet implements LongPairSet {
@Override
public boolean add(long item1, long item2) {
ConcurrentLongPairSet messagesToReplay = longPairSets.computeIfAbsent(item1,
- (key) -> new ConcurrentLongPairSet(expectedItems, concurrencyLevel));
+ (key) -> ConcurrentLongPairSet.newBuilder()
+ .expectedItems(expectedItems)
+ .concurrencyLevel(concurrencyLevel)
+ .build());
return messagesToReplay.add(item1, item2);
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
index 82cac712975..a8d3e1d0603 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java
@@ -45,21 +45,29 @@ public class ConcurrentLongPairSetTest {
@Test
public void testConstructor() {
try {
- new ConcurrentLongPairSet(0);
+ ConcurrentLongPairSet.newBuilder()
+ .expectedItems(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongPairSet(16, 0);
+ ConcurrentLongPairSet.newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongPairSet(4, 8);
+ ConcurrentLongPairSet.newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(8)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
@@ -68,7 +76,9 @@ public class ConcurrentLongPairSetTest {
@Test
public void simpleInsertions() {
- ConcurrentLongPairSet set = new ConcurrentLongPairSet(16);
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+ .expectedItems(16)
+ .build();
assertTrue(set.isEmpty());
assertTrue(set.add(1, 1));
@@ -94,9 +104,64 @@ public class ConcurrentLongPairSetTest {
assertEquals(set.size(), 3);
}
+ @Test
+ public void testClear() {
+ ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertTrue(map.add(1, 1));
+ assertTrue(map.add(2, 2));
+ assertTrue(map.add(3, 3));
+
+ assertTrue(map.capacity() == 8);
+ map.clear();
+ assertTrue(map.capacity() == 4);
+ }
+
+ @Test
+ public void testExpandAndShrink() {
+ ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertTrue(map.add(1, 1));
+ assertTrue(map.add(2, 2));
+ assertTrue(map.add(3, 3));
+
+ // expand hashmap
+ assertTrue(map.capacity() == 8);
+
+ assertTrue(map.remove(1, 1));
+ // not shrink
+ assertTrue(map.capacity() == 8);
+ assertTrue(map.remove(2, 2));
+ // shrink hashmap
+ assertTrue(map.capacity() == 4);
+
+ // expand hashmap
+ assertTrue(map.add(4, 4));
+ assertTrue(map.add(5, 5));
+ assertTrue(map.capacity() == 8);
+
+ //verify that the map does not keep shrinking at every remove() operation
+ assertTrue(map.add(6, 6));
+ assertTrue(map.remove(6, 6));
+ assertTrue(map.capacity() == 8);
+ }
+
+
@Test
public void testRemove() {
- ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
assertTrue(set.isEmpty());
assertTrue(set.add(1, 1));
@@ -111,7 +176,10 @@ public class ConcurrentLongPairSetTest {
@Test
public void testRehashing() {
int n = 16;
- ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1);
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(set.capacity(), n);
assertEquals(set.size(), 0);
@@ -126,7 +194,10 @@ public class ConcurrentLongPairSetTest {
@Test
public void testRehashingRemoval() {
int n = 16;
- ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1);
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(set.capacity(), n);
assertEquals(set.size(), 0);
@@ -152,7 +223,10 @@ public class ConcurrentLongPairSetTest {
@Test
public void testRehashingWithDeletes() {
int n = 16;
- ConcurrentLongPairSet set = new ConcurrentLongPairSet(n / 2, 1);
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(set.capacity(), n);
assertEquals(set.size(), 0);
@@ -177,7 +251,7 @@ public class ConcurrentLongPairSetTest {
@Test
public void concurrentInsertions() throws Throwable {
- ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
@@ -210,7 +284,7 @@ public class ConcurrentLongPairSetTest {
@Test
public void concurrentInsertionsAndReads() throws Throwable {
- ConcurrentLongPairSet map = new ConcurrentLongPairSet();
+ ConcurrentLongPairSet map = ConcurrentLongPairSet.newBuilder().build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
@@ -243,7 +317,7 @@ public class ConcurrentLongPairSetTest {
@Test
public void testIteration() {
- ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
assertEquals(set.items(), Collections.emptyList());
@@ -269,7 +343,7 @@ public class ConcurrentLongPairSetTest {
@Test
public void testRemoval() {
- ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
set.add(0, 0);
set.add(1, 1);
@@ -295,7 +369,7 @@ public class ConcurrentLongPairSetTest {
@Test
public void testIfRemoval() {
- ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
set.add(0, 0);
set.add(1, 1);
@@ -319,7 +393,7 @@ public class ConcurrentLongPairSetTest {
@Test
public void testItems() {
- ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
int n = 100;
int limit = 10;
@@ -340,7 +414,10 @@ public class ConcurrentLongPairSetTest {
@Test
public void testHashConflictWithDeletion() {
final int Buckets = 16;
- ConcurrentLongPairSet set = new ConcurrentLongPairSet(Buckets, 1);
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder()
+ .expectedItems(Buckets)
+ .concurrencyLevel(1)
+ .build();
// Pick 2 keys that fall into the same bucket
long key1 = 1;
@@ -375,7 +452,7 @@ public class ConcurrentLongPairSetTest {
@Test
public void testEqualsObjects() {
- ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
long t1 = 1;
long t2 = 2;
@@ -397,7 +474,7 @@ public class ConcurrentLongPairSetTest {
@Test
public void testToString() {
- ConcurrentLongPairSet set = new ConcurrentLongPairSet();
+ ConcurrentLongPairSet set = ConcurrentLongPairSet.newBuilder().build();
set.add(0, 0);
set.add(1, 1);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
index 254be51f292..7919485d9b6 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java
@@ -49,21 +49,29 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testConstructor() {
try {
- new ConcurrentOpenHashMap<String, String>(0);
+ ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentOpenHashMap<String, String>(16, 0);
+ ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentOpenHashMap<String, String>(4, 8);
+ ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(8)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
@@ -72,7 +80,10 @@ public class ConcurrentOpenHashMapTest {
@Test
public void simpleInsertions() {
- ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16);
+ ConcurrentOpenHashMap<String, String> map =
+ ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(16)
+ .build();
assertTrue(map.isEmpty());
assertNull(map.put("1", "one"));
@@ -98,9 +109,64 @@ public class ConcurrentOpenHashMapTest {
assertEquals(map.size(), 3);
}
+ @Test
+ public void testClear() {
+ ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertNull(map.put("k1", "v1"));
+ assertNull(map.put("k2", "v2"));
+ assertNull(map.put("k3", "v3"));
+
+ assertTrue(map.capacity() == 8);
+ map.clear();
+ assertTrue(map.capacity() == 4);
+ }
+
+ @Test
+ public void testExpandAndShrink() {
+ ConcurrentOpenHashMap<String, String> map = ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertNull(map.put("k1", "v1"));
+ assertNull(map.put("k2", "v2"));
+ assertNull(map.put("k3", "v3"));
+
+ // expand hashmap
+ assertTrue(map.capacity() == 8);
+
+ assertTrue(map.remove("k1", "v1"));
+ // not shrink
+ assertTrue(map.capacity() == 8);
+ assertTrue(map.remove("k2", "v2"));
+ // shrink hashmap
+ assertTrue(map.capacity() == 4);
+
+ // expand hashmap
+ assertNull(map.put("k4", "v4"));
+ assertNull(map.put("k5", "v5"));
+ assertTrue(map.capacity() == 8);
+
+ //verify that the map does not keep shrinking at every remove() operation
+ assertNull(map.put("k6", "v6"));
+ assertTrue(map.remove("k6", "v6"));
+ assertTrue(map.capacity() == 8);
+ }
+
@Test
public void testRemove() {
- ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<String, String> map =
+ ConcurrentOpenHashMap.<String, String>newBuilder().build();
assertTrue(map.isEmpty());
assertNull(map.put("1", "one"));
@@ -117,7 +183,10 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testRehashing() {
int n = 16;
- ConcurrentOpenHashMap<String, Integer> map = new ConcurrentOpenHashMap<>(n / 2, 1);
+ ConcurrentOpenHashMap<String, Integer> map = ConcurrentOpenHashMap.<String, Integer>newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -132,7 +201,11 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testRehashingWithDeletes() {
int n = 16;
- ConcurrentOpenHashMap<Integer, Integer> map = new ConcurrentOpenHashMap<>(n / 2, 1);
+ ConcurrentOpenHashMap<Integer, Integer> map =
+ ConcurrentOpenHashMap.<Integer, Integer>newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -154,7 +227,10 @@ public class ConcurrentOpenHashMapTest {
@Test
public void concurrentInsertions() throws Throwable {
- ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(16, 1);
+ ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
@@ -188,7 +264,8 @@ public class ConcurrentOpenHashMapTest {
@Test
public void concurrentInsertionsAndReads() throws Throwable {
- ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<Long, String> map =
+ ConcurrentOpenHashMap.<Long, String>newBuilder().build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
@@ -222,7 +299,8 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testIteration() {
- ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<Long, String> map =
+ ConcurrentOpenHashMap.<Long, String>newBuilder().build();
assertEquals(map.keys(), Collections.emptyList());
assertEquals(map.values(), Collections.emptyList());
@@ -266,7 +344,10 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testHashConflictWithDeletion() {
final int Buckets = 16;
- ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(Buckets, 1);
+ ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder()
+ .expectedItems(Buckets)
+ .concurrencyLevel(1)
+ .build();
// Pick 2 keys that fall into the same bucket
long key1 = 1;
@@ -299,7 +380,8 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testPutIfAbsent() {
- ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<Long, String> map =
+ ConcurrentOpenHashMap.<Long, String>newBuilder().build();
assertNull(map.putIfAbsent(1l, "one"));
assertEquals(map.get(1l), "one");
@@ -309,7 +391,10 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testComputeIfAbsent() {
- ConcurrentOpenHashMap<Integer, Integer> map = new ConcurrentOpenHashMap<>(16, 1);
+ ConcurrentOpenHashMap<Integer, Integer> map = ConcurrentOpenHashMap.<Integer, Integer>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
AtomicInteger counter = new AtomicInteger();
Function<Integer, Integer> provider = key -> counter.getAndIncrement();
@@ -350,7 +435,8 @@ public class ConcurrentOpenHashMapTest {
}
}
- ConcurrentOpenHashMap<T, String> map = new ConcurrentOpenHashMap<>();
+ ConcurrentOpenHashMap<T, String> map =
+ ConcurrentOpenHashMap.<T, String>newBuilder().build();
T t1 = new T(1);
T t1_b = new T(1);
@@ -372,7 +458,11 @@ public class ConcurrentOpenHashMapTest {
@Test
public void testNullValue() {
- ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16, 1);
+ ConcurrentOpenHashMap<String, String> map =
+ ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
String key = "a";
assertThrows(NullPointerException.class, () -> map.put(key, null));
@@ -406,7 +496,10 @@ public class ConcurrentOpenHashMapTest {
static final int N = 1_000_000;
public void benchConcurrentOpenHashMap() throws Exception {
- ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(N, 1);
+ ConcurrentOpenHashMap<Long, String> map = ConcurrentOpenHashMap.<Long, String>newBuilder()
+ .expectedItems(N)
+ .concurrencyLevel(1)
+ .build();
for (long i = 0; i < Iterations; i++) {
for (int j = 0; j < N; j++) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
index 3c1d99668d7..af62948b64a 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java
@@ -91,9 +91,66 @@ public class ConcurrentOpenHashSetTest {
assertEquals(set.size(), 3);
}
+ @Test
+ public void testClear() {
+ ConcurrentOpenHashSet<String> map =
+ ConcurrentOpenHashSet.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertTrue(map.add("k1"));
+ assertTrue(map.add("k2"));
+ assertTrue(map.add("k3"));
+
+ assertTrue(map.capacity() == 8);
+ map.clear();
+ assertTrue(map.capacity() == 4);
+ }
+
+ @Test
+ public void testExpandAndShrink() {
+ ConcurrentOpenHashSet<String> map =
+ ConcurrentOpenHashSet.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertTrue(map.add("k1"));
+ assertTrue(map.add("k2"));
+ assertTrue(map.add("k3"));
+
+ // expand hashmap
+ assertTrue(map.capacity() == 8);
+
+ assertTrue(map.remove("k1"));
+ // not shrink
+ assertTrue(map.capacity() == 8);
+ assertTrue(map.remove("k2"));
+ // shrink hashmap
+ assertTrue(map.capacity() == 4);
+
+ // expand hashmap
+ assertTrue(map.add("k4"));
+ assertTrue(map.add("k5"));
+ assertTrue(map.capacity() == 8);
+
+ //verify that the map does not keep shrinking at every remove() operation
+ assertTrue(map.add("k6"));
+ assertTrue(map.remove("k6"));
+ assertTrue(map.capacity() == 8);
+ }
+
@Test
public void testRemove() {
- ConcurrentOpenHashSet<String> set = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<String> set =
+ ConcurrentOpenHashSet.<String>newBuilder().build();
assertTrue(set.isEmpty());
assertTrue(set.add("1"));
@@ -145,7 +202,8 @@ public class ConcurrentOpenHashSetTest {
@Test
public void concurrentInsertions() throws Throwable {
- ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<Long> set =
+ ConcurrentOpenHashSet.<Long>newBuilder().build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
@@ -178,7 +236,8 @@ public class ConcurrentOpenHashSetTest {
@Test
public void concurrentInsertionsAndReads() throws Throwable {
- ConcurrentOpenHashSet<Long> map = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<Long> map =
+ ConcurrentOpenHashSet.<Long>newBuilder().build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
@@ -211,7 +270,7 @@ public class ConcurrentOpenHashSetTest {
@Test
public void testIteration() {
- ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<Long> set = ConcurrentOpenHashSet.<Long>newBuilder().build();
assertEquals(set.values(), Collections.emptyList());
@@ -237,7 +296,8 @@ public class ConcurrentOpenHashSetTest {
@Test
public void testRemoval() {
- ConcurrentOpenHashSet<Integer> set = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<Integer> set =
+ ConcurrentOpenHashSet.<Integer>newBuilder().build();
set.add(0);
set.add(1);
@@ -315,7 +375,8 @@ public class ConcurrentOpenHashSetTest {
}
}
- ConcurrentOpenHashSet<T> set = new ConcurrentOpenHashSet<>();
+ ConcurrentOpenHashSet<T> set =
+ ConcurrentOpenHashSet.<T>newBuilder().build();
T t1 = new T(1);
T t1_b = new T(1);
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 1318e5c3275..1cce453541a 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -88,9 +88,17 @@ public class WebSocketService implements Closeable {
public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {
this.config = config;
this.localCluster = localCluster;
- this.topicProducerMap = new ConcurrentOpenHashMap<>();
- this.topicConsumerMap = new ConcurrentOpenHashMap<>();
- this.topicReaderMap = new ConcurrentOpenHashMap<>();
+ this.topicProducerMap =
+ ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashSet<ProducerHandler>>newBuilder()
+ .build();
+ this.topicConsumerMap =
+ ConcurrentOpenHashMap.<String,
+ ConcurrentOpenHashSet<ConsumerHandler>>newBuilder()
+ .build();
+ this.topicReaderMap =
+ ConcurrentOpenHashMap.<String, ConcurrentOpenHashSet<ReaderHandler>>newBuilder()
+ .build();
this.proxyStats = new ProxyStats(this);
}
@@ -260,7 +268,8 @@ public class WebSocketService implements Closeable {
public boolean addProducer(ProducerHandler producer) {
return topicProducerMap
- .computeIfAbsent(producer.getProducer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
+ .computeIfAbsent(producer.getProducer().getTopic(),
+ topic -> ConcurrentOpenHashSet.<ProducerHandler>newBuilder().build())
.add(producer);
}
@@ -278,7 +287,8 @@ public class WebSocketService implements Closeable {
public boolean addConsumer(ConsumerHandler consumer) {
return topicConsumerMap
- .computeIfAbsent(consumer.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
+ .computeIfAbsent(consumer.getConsumer().getTopic(), topic ->
+ ConcurrentOpenHashSet.<ConsumerHandler>newBuilder().build())
.add(consumer);
}
@@ -295,7 +305,8 @@ public class WebSocketService implements Closeable {
}
public boolean addReader(ReaderHandler reader) {
- return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
+ return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic ->
+ ConcurrentOpenHashSet.<ReaderHandler>newBuilder().build())
.add(reader);
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java
index 9bf5f4a68f6..8fa91130ae4 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyStats.java
@@ -52,7 +52,9 @@ public class ProxyStats {
super();
this.service = service;
this.jvmMetrics = new JvmMetrics(service);
- this.topicStats = new ConcurrentOpenHashMap<>();
+ this.topicStats =
+ ConcurrentOpenHashMap.<String, ProxyNamespaceStats>newBuilder()
+ .build();
this.metricsCollection = Lists.newArrayList();
this.tempMetricsCollection = Lists.newArrayList();
// schedule stat generation task every 1 minute