You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 14:11:13 UTC
[pulsar] 02/15: Support shrink in ConcurrentLongHashMap (#14497)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 241320fd891945fee88470935ee8ea8e616f7ab1
Author: lin chen <15...@qq.com>
AuthorDate: Tue Mar 1 21:16:52 2022 +0800
Support shrink in ConcurrentLongHashMap (#14497)
(cherry picked from commit 297941964ed739e35ca68aa46d74410cf112b7bc)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 +-
.../apache/pulsar/broker/service/ServerCnx.java | 10 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 23 +++-
.../client/impl/TransactionMetaStoreHandler.java | 5 +-
.../TransactionCoordinatorClientImpl.java | 6 +-
.../util/collections/ConcurrentLongHashMap.java | 139 ++++++++++++++++++---
.../collections/ConcurrentLongHashMapTest.java | 122 +++++++++++++++---
7 files changed, 272 insertions(+), 40 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index dc448ca4f45..deed473d41e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -152,8 +152,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected Map<String, String> propertiesMap;
protected final MetaStore store;
- final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>(
- 16 /* initial capacity */, 1 /* number of sections */);
+ final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache =
+ ConcurrentLongHashMap.<CompletableFuture<ReadHandle>>newBuilder()
+ .expectedItems(16) // initial capacity
+ .concurrencyLevel(1) // number of sections
+ .build();
protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
private volatile Stat ledgersStat;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1a0be5ea9ce..a81652ec3c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -229,8 +229,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
ServiceConfiguration conf = pulsar.getConfiguration();
// This maps are not heavily contended since most accesses are within the cnx thread
- this.producers = new ConcurrentLongHashMap<>(8, 1);
- this.consumers = new ConcurrentLongHashMap<>(8, 1);
+ this.producers = ConcurrentLongHashMap.<CompletableFuture<Producer>>newBuilder()
+ .expectedItems(8)
+ .concurrencyLevel(1)
+ .build();
+ this.consumers = ConcurrentLongHashMap.<CompletableFuture<Consumer>>newBuilder()
+ .expectedItems(8)
+ .concurrencyLevel(1)
+ .build();
this.replicatorPrefix = conf.getReplicatorPrefix();
this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = conf.getProxyRoles();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index fa24286b175..e8caecf1889 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -105,13 +105,28 @@ public class ClientCnx extends PulsarHandler {
private State state;
private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
- new ConcurrentLongHashMap<>(16, 1);
+ ConcurrentLongHashMap.<TimedCompletableFuture<? extends Object>>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
// LookupRequests that waiting in client side.
private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;
- private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
- private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
- private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
+ private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
+ ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
+ private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
+ ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
+ private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers =
+ ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 05a5cae0424..4cb89e53b7a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -58,7 +58,10 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
private final long transactionCoordinatorId;
private final ConnectionHandler connectionHandler;
private final ConcurrentLongHashMap<OpBase<?>> pendingRequests =
- new ConcurrentLongHashMap<>(16, 1);
+ ConcurrentLongHashMap.<OpBase<?>>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;
private static class RequestTime {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 8db80545ad2..e8baec784a7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -52,7 +52,11 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC
private final PulsarClientImpl pulsarClient;
private TransactionMetaStoreHandler[] handlers;
- private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap = new ConcurrentLongHashMap<>(16, 1);
+ private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap =
+ ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
private final AtomicLong epoch = new AtomicLong(0);
private static final AtomicReferenceFieldUpdater<TransactionCoordinatorClientImpl, State> STATE_UPDATER =
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index cd285221bc8..a4779357a44 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -44,33 +44,112 @@ public class ConcurrentLongHashMap<V> {
private static final Object EmptyValue = null;
private static final Object DeletedValue = new Object();
- private static final float MapFillFactor = 0.66f;
-
private static final int DefaultExpectedItems = 256;
private static final int DefaultConcurrencyLevel = 16;
+ private static final float DefaultMapFillFactor = 0.66f;
+ private static final float DefaultMapIdleFactor = 0.15f;
+
+ private static final float DefaultExpandFactor = 2;
+ private static final float DefaultShrinkFactor = 2;
+
+ private static final boolean DefaultAutoShrink = false;
+
+ public static <V> Builder<V> newBuilder() {
+ return new Builder<>();
+ }
+
+ /**
+ * Builder of ConcurrentLongHashMap.
+ */
+ public static class Builder<T> {
+ int expectedItems = DefaultExpectedItems;
+ int concurrencyLevel = DefaultConcurrencyLevel;
+ float mapFillFactor = DefaultMapFillFactor;
+ float mapIdleFactor = DefaultMapIdleFactor;
+ float expandFactor = DefaultExpandFactor;
+ float shrinkFactor = DefaultShrinkFactor;
+ boolean autoShrink = DefaultAutoShrink;
+
+ public Builder<T> expectedItems(int expectedItems) {
+ this.expectedItems = expectedItems;
+ return this;
+ }
+
+ public Builder<T> concurrencyLevel(int concurrencyLevel) {
+ this.concurrencyLevel = concurrencyLevel;
+ return this;
+ }
+
+ public Builder<T> mapFillFactor(float mapFillFactor) {
+ this.mapFillFactor = mapFillFactor;
+ return this;
+ }
+
+ public Builder<T> mapIdleFactor(float mapIdleFactor) {
+ this.mapIdleFactor = mapIdleFactor;
+ return this;
+ }
+
+ public Builder<T> expandFactor(float expandFactor) {
+ this.expandFactor = expandFactor;
+ return this;
+ }
+
+ public Builder<T> shrinkFactor(float shrinkFactor) {
+ this.shrinkFactor = shrinkFactor;
+ return this;
+ }
+
+ public Builder<T> autoShrink(boolean autoShrink) {
+ this.autoShrink = autoShrink;
+ return this;
+ }
+
+ public ConcurrentLongHashMap<T> build() {
+ return new ConcurrentLongHashMap<>(expectedItems, concurrencyLevel,
+ mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+ }
+ }
+
private final Section<V>[] sections;
+ @Deprecated
public ConcurrentLongHashMap() {
this(DefaultExpectedItems);
}
+ @Deprecated
public ConcurrentLongHashMap(int expectedItems) {
this(expectedItems, DefaultConcurrencyLevel);
}
+ @Deprecated
public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) {
+ this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+ DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+ }
+
+ public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel,
+ float mapFillFactor, float mapIdleFactor,
+ boolean autoShrink, float expandFactor, float shrinkFactor) {
checkArgument(expectedItems > 0);
checkArgument(concurrencyLevel > 0);
checkArgument(expectedItems >= concurrencyLevel);
+ checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+ checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+ checkArgument(mapFillFactor > mapIdleFactor);
+ checkArgument(expandFactor > 1);
+ checkArgument(shrinkFactor > 1);
int numSections = concurrencyLevel;
int perSectionExpectedItems = expectedItems / numSections;
- int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+ int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
this.sections = (Section<V>[]) new Section[numSections];
for (int i = 0; i < numSections; i++) {
- sections[i] = new Section<>(perSectionCapacity);
+ sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
+ autoShrink, expandFactor, shrinkFactor);
}
}
@@ -195,20 +274,35 @@ public class ConcurrentLongHashMap<V> {
private volatile V[] values;
private volatile int capacity;
+ private final int initCapacity;
private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
private volatile int size;
private int usedBuckets;
- private int resizeThreshold;
-
- Section(int capacity) {
+ private int resizeThresholdUp;
+ private int resizeThresholdBelow;
+ private final float mapFillFactor;
+ private final float mapIdleFactor;
+ private final float expandFactor;
+ private final float shrinkFactor;
+ private final boolean autoShrink;
+
+ Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+ float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
+ this.initCapacity = this.capacity;
this.keys = new long[this.capacity];
this.values = (V[]) new Object[this.capacity];
this.size = 0;
this.usedBuckets = 0;
- this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+ this.autoShrink = autoShrink;
+ this.mapFillFactor = mapFillFactor;
+ this.mapIdleFactor = mapIdleFactor;
+ this.expandFactor = expandFactor;
+ this.shrinkFactor = shrinkFactor;
+ this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+ this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
}
V get(long key, int keyHash) {
@@ -322,9 +416,10 @@ public class ConcurrentLongHashMap<V> {
++bucket;
}
} finally {
- if (usedBuckets >= resizeThreshold) {
+ if (usedBuckets > resizeThresholdUp) {
try {
- rehash();
+ int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+ rehash(newCapacity);
} finally {
unlockWrite(stamp);
}
@@ -373,7 +468,20 @@ public class ConcurrentLongHashMap<V> {
}
} finally {
- unlockWrite(stamp);
+ if (autoShrink && size < resizeThresholdBelow) {
+ try {
+ int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+ int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+ if (newCapacity < capacity && newResizeThresholdUp > size) {
+ // shrink the hashmap
+ rehash(newCapacity);
+ }
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
}
}
@@ -385,6 +493,9 @@ public class ConcurrentLongHashMap<V> {
Arrays.fill(values, EmptyValue);
this.size = 0;
this.usedBuckets = 0;
+ if (autoShrink) {
+ rehash(initCapacity);
+ }
} finally {
unlockWrite(stamp);
}
@@ -439,9 +550,8 @@ public class ConcurrentLongHashMap<V> {
}
}
- private void rehash() {
+ private void rehash(int newCapacity) {
// Expand the hashmap
- int newCapacity = capacity * 2;
long[] newKeys = new long[newCapacity];
V[] newValues = (V[]) new Object[newCapacity];
@@ -458,7 +568,8 @@ public class ConcurrentLongHashMap<V> {
values = newValues;
capacity = newCapacity;
usedBuckets = size;
- resizeThreshold = (int) (capacity * MapFillFactor);
+ resizeThresholdUp = (int) (capacity * mapFillFactor);
+ resizeThresholdBelow = (int) (capacity * mapIdleFactor);
}
private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index 14d8395ae8c..6cf126cf2ff 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -48,21 +48,29 @@ public class ConcurrentLongHashMapTest {
@Test
public void testConstructor() {
try {
- new ConcurrentLongHashMap<String>(0);
+ ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongHashMap<String>(16, 0);
+ ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(0)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
}
try {
- new ConcurrentLongHashMap<String>(4, 8);
+ ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(8)
+ .build();
fail("should have thrown exception");
} catch (IllegalArgumentException e) {
// ok
@@ -71,7 +79,9 @@ public class ConcurrentLongHashMapTest {
@Test
public void simpleInsertions() {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(16)
+ .build();
assertTrue(map.isEmpty());
assertNull(map.put(1, "one"));
@@ -97,9 +107,64 @@ public class ConcurrentLongHashMapTest {
assertEquals(map.size(), 3);
}
+ @Test
+ public void testClear() {
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertNull(map.put(1, "v1"));
+ assertNull(map.put(2, "v2"));
+ assertNull(map.put(3, "v3"));
+
+ assertTrue(map.capacity() == 8);
+ map.clear();
+ assertTrue(map.capacity() == 4);
+ }
+
+ @Test
+ public void testExpandAndShrink() {
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertTrue(map.capacity() == 4);
+
+ assertNull(map.put(1, "v1"));
+ assertNull(map.put(2, "v2"));
+ assertNull(map.put(3, "v3"));
+
+ // expand hashmap
+ assertTrue(map.capacity() == 8);
+
+ assertTrue(map.remove(1, "v1"));
+ // not shrink
+ assertTrue(map.capacity() == 8);
+ assertTrue(map.remove(2, "v2"));
+ // shrink hashmap
+ assertTrue(map.capacity() == 4);
+
+ // expand hashmap
+ assertNull(map.put(4, "v4"));
+ assertNull(map.put(5, "v5"));
+ assertTrue(map.capacity() == 8);
+
+ //verify that the map does not keep shrinking at every remove() operation
+ assertNull(map.put(6, "v6"));
+ assertTrue(map.remove(6, "v6"));
+ assertTrue(map.capacity() == 8);
+ }
+
@Test
public void testRemove() {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .build();
assertTrue(map.isEmpty());
assertNull(map.put(1, "one"));
@@ -115,7 +180,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void testNegativeUsedBucketCount() {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
map.put(0, "zero");
assertEquals(1, map.getUsedBucketCount());
@@ -130,7 +198,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void testRehashing() {
int n = 16;
- ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+ ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -145,7 +216,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void testRehashingWithDeletes() {
int n = 16;
- ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+ ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
+ .expectedItems(n / 2)
+ .concurrencyLevel(1)
+ .build();
assertEquals(map.capacity(), n);
assertEquals(map.size(), 0);
@@ -167,7 +241,8 @@ public class ConcurrentLongHashMapTest {
@Test
public void concurrentInsertions() throws Throwable {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
@@ -201,7 +276,8 @@ public class ConcurrentLongHashMapTest {
@Test
public void concurrentInsertionsAndReads() throws Throwable {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
@@ -235,7 +311,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void stressConcurrentInsertionsAndReads() throws Throwable {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(1)
+ .build();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final int writeThreads = 16;
@@ -286,7 +365,8 @@ public class ConcurrentLongHashMapTest {
@Test
public void testIteration() {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .build();
assertEquals(map.keys(), Collections.emptyList());
assertEquals(map.values(), Collections.emptyList());
@@ -330,7 +410,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void testHashConflictWithDeletion() {
final int Buckets = 16;
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(Buckets, 1);
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(Buckets)
+ .concurrencyLevel(1)
+ .build();
// Pick 2 keys that fall into the same bucket
long key1 = 1;
@@ -363,7 +446,8 @@ public class ConcurrentLongHashMapTest {
@Test
public void testPutIfAbsent() {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .build();
assertNull(map.putIfAbsent(1, "one"));
assertEquals(map.get(1), "one");
@@ -373,7 +457,10 @@ public class ConcurrentLongHashMapTest {
@Test
public void testComputeIfAbsent() {
- ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1);
+ ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
+ .expectedItems(16)
+ .concurrencyLevel(1)
+ .build();
AtomicInteger counter = new AtomicInteger();
LongFunction<Integer> provider = key -> counter.getAndIncrement();
@@ -395,7 +482,10 @@ public class ConcurrentLongHashMapTest {
static final int N = 100_000;
public void benchConcurrentLongHashMap() throws Exception {
- ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
+ ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(N)
+ .concurrencyLevel(1)
+ .build();
for (long i = 0; i < Iterations; i++) {
for (int j = 0; j < N; j++) {