You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 14:11:13 UTC

[pulsar] 02/15: Support shrink in ConcurrentLongHashMap (#14497)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 241320fd891945fee88470935ee8ea8e616f7ab1
Author: lin chen <15...@qq.com>
AuthorDate: Tue Mar 1 21:16:52 2022 +0800

    Support shrink in ConcurrentLongHashMap (#14497)
    
    (cherry picked from commit 297941964ed739e35ca68aa46d74410cf112b7bc)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   7 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  10 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  23 +++-
 .../client/impl/TransactionMetaStoreHandler.java   |   5 +-
 .../TransactionCoordinatorClientImpl.java          |   6 +-
 .../util/collections/ConcurrentLongHashMap.java    | 139 ++++++++++++++++++---
 .../collections/ConcurrentLongHashMapTest.java     | 122 +++++++++++++++---
 7 files changed, 272 insertions(+), 40 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index dc448ca4f45..deed473d41e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -152,8 +152,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     protected Map<String, String> propertiesMap;
     protected final MetaStore store;
 
-    final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache = new ConcurrentLongHashMap<>(
-            16 /* initial capacity */, 1 /* number of sections */);
+    final ConcurrentLongHashMap<CompletableFuture<ReadHandle>> ledgerCache =
+            ConcurrentLongHashMap.<CompletableFuture<ReadHandle>>newBuilder()
+                    .expectedItems(16) // initial capacity
+                    .concurrencyLevel(1) // number of sections
+                    .build();
     protected final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
     private volatile Stat ledgersStat;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1a0be5ea9ce..a81652ec3c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -229,8 +229,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         ServiceConfiguration conf = pulsar.getConfiguration();
 
         // This maps are not heavily contended since most accesses are within the cnx thread
-        this.producers = new ConcurrentLongHashMap<>(8, 1);
-        this.consumers = new ConcurrentLongHashMap<>(8, 1);
+        this.producers = ConcurrentLongHashMap.<CompletableFuture<Producer>>newBuilder()
+                .expectedItems(8)
+                .concurrencyLevel(1)
+                .build();
+        this.consumers = ConcurrentLongHashMap.<CompletableFuture<Consumer>>newBuilder()
+                .expectedItems(8)
+                .concurrencyLevel(1)
+                .build();
         this.replicatorPrefix = conf.getReplicatorPrefix();
         this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
         this.proxyRoles = conf.getProxyRoles();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index fa24286b175..e8caecf1889 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -105,13 +105,28 @@ public class ClientCnx extends PulsarHandler {
     private State state;
 
     private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
-        new ConcurrentLongHashMap<>(16, 1);
+            ConcurrentLongHashMap.<TimedCompletableFuture<? extends Object>>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
     // LookupRequests that waiting in client side.
     private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;
 
-    private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
-    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
-    private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
+            ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
+    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
+            ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
+    private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers =
+            ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
 
     private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
     private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 05a5cae0424..4cb89e53b7a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -58,7 +58,10 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
     private final long transactionCoordinatorId;
     private final ConnectionHandler connectionHandler;
     private final ConcurrentLongHashMap<OpBase<?>> pendingRequests =
-        new ConcurrentLongHashMap<>(16, 1);
+            ConcurrentLongHashMap.<OpBase<?>>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
     private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;
 
     private static class RequestTime {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 8db80545ad2..e8baec784a7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -52,7 +52,11 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC
 
     private final PulsarClientImpl pulsarClient;
     private TransactionMetaStoreHandler[] handlers;
-    private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap = new ConcurrentLongHashMap<>(16, 1);
+    private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap =
+            ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
     private final AtomicLong epoch = new AtomicLong(0);
 
     private static final AtomicReferenceFieldUpdater<TransactionCoordinatorClientImpl, State> STATE_UPDATER =
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index cd285221bc8..a4779357a44 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -44,33 +44,112 @@ public class ConcurrentLongHashMap<V> {
     private static final Object EmptyValue = null;
     private static final Object DeletedValue = new Object();
 
-    private static final float MapFillFactor = 0.66f;
-
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
+    public static <V> Builder<V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
+     * Builder of ConcurrentLongHashMap.
+     */
+    public static class Builder<T> {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder<T> expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder<T> concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder<T> mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder<T> mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder<T> expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder<T> shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder<T> autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentLongHashMap<T> build() {
+            return new ConcurrentLongHashMap<>(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
     private final Section<V>[] sections;
 
+    @Deprecated
     public ConcurrentLongHashMap() {
         this(DefaultExpectedItems);
     }
 
+    @Deprecated
     public ConcurrentLongHashMap(int expectedItems) {
         this(expectedItems, DefaultConcurrencyLevel);
     }
 
+    @Deprecated
     public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) {
+        this(expectedItems, concurrencyLevel, DefaultMapFillFactor, DefaultMapIdleFactor,
+                DefaultAutoShrink, DefaultExpandFactor, DefaultShrinkFactor);
+    }
+
+    public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel,
+                                 float mapFillFactor, float mapIdleFactor,
+                                 boolean autoShrink, float expandFactor, float shrinkFactor) {
         checkArgument(expectedItems > 0);
         checkArgument(concurrencyLevel > 0);
         checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
 
         int numSections = concurrencyLevel;
         int perSectionExpectedItems = expectedItems / numSections;
-        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
         this.sections = (Section<V>[]) new Section[numSections];
 
         for (int i = 0; i < numSections; i++) {
-            sections[i] = new Section<>(perSectionCapacity);
+            sections[i] = new Section<>(perSectionCapacity, mapFillFactor, mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
         }
     }
 
@@ -195,20 +274,35 @@ public class ConcurrentLongHashMap<V> {
         private volatile V[] values;
 
         private volatile int capacity;
+        private final int initCapacity;
         private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
                 AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
 
         private volatile int size;
         private int usedBuckets;
-        private int resizeThreshold;
-
-        Section(int capacity) {
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
             this.keys = new long[this.capacity];
             this.values = (V[]) new Object[this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
-            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
         }
 
         V get(long key, int keyHash) {
@@ -322,9 +416,10 @@ public class ConcurrentLongHashMap<V> {
                     ++bucket;
                 }
             } finally {
-                if (usedBuckets >= resizeThreshold) {
+                if (usedBuckets > resizeThresholdUp) {
                     try {
-                        rehash();
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+                        rehash(newCapacity);
                     } finally {
                         unlockWrite(stamp);
                     }
@@ -373,7 +468,20 @@ public class ConcurrentLongHashMap<V> {
                 }
 
             } finally {
-                unlockWrite(stamp);
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
             }
         }
 
@@ -385,6 +493,9 @@ public class ConcurrentLongHashMap<V> {
                 Arrays.fill(values, EmptyValue);
                 this.size = 0;
                 this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
             } finally {
                 unlockWrite(stamp);
             }
@@ -439,9 +550,8 @@ public class ConcurrentLongHashMap<V> {
             }
         }
 
-        private void rehash() {
+        private void rehash(int newCapacity) {
             // Expand the hashmap
-            int newCapacity = capacity * 2;
             long[] newKeys = new long[newCapacity];
             V[] newValues = (V[]) new Object[newCapacity];
 
@@ -458,7 +568,8 @@ public class ConcurrentLongHashMap<V> {
             values = newValues;
             capacity = newCapacity;
             usedBuckets = size;
-            resizeThreshold = (int) (capacity * MapFillFactor);
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
         }
 
         private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index 14d8395ae8c..6cf126cf2ff 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -48,21 +48,29 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testConstructor() {
         try {
-            new ConcurrentLongHashMap<String>(0);
+            ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongHashMap<String>(16, 0);
+            ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
         try {
-            new ConcurrentLongHashMap<String>(4, 8);
+            ConcurrentLongHashMap.<String>newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
             fail("should have thrown exception");
         } catch (IllegalArgumentException e) {
             // ok
@@ -71,7 +79,9 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void simpleInsertions() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(16)
+                .build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put(1, "one"));
@@ -97,9 +107,64 @@ public class ConcurrentLongHashMapTest {
         assertEquals(map.size(), 3);
     }
 
+    @Test
+    public void testClear() {
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put(1, "v1"));
+        assertNull(map.put(2, "v2"));
+        assertNull(map.put(3, "v3"));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertNull(map.put(1, "v1"));
+        assertNull(map.put(2, "v2"));
+        assertNull(map.put(3, "v3"));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove(1, "v1"));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove(2, "v2"));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertNull(map.put(4, "v4"));
+        assertNull(map.put(5, "v5"));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() operation
+        assertNull(map.put(6, "v6"));
+        assertTrue(map.remove(6, "v6"));
+        assertTrue(map.capacity() == 8);
+    }
+
     @Test
     public void testRemove() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .build();
 
         assertTrue(map.isEmpty());
         assertNull(map.put(1, "one"));
@@ -115,7 +180,10 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testNegativeUsedBucketCount() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
 
         map.put(0, "zero");
         assertEquals(1, map.getUsedBucketCount());
@@ -130,7 +198,10 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testRehashing() {
         int n = 16;
-        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+        ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -145,7 +216,10 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testRehashingWithDeletes() {
         int n = 16;
-        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+        ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
         assertEquals(map.capacity(), n);
         assertEquals(map.size(), 0);
 
@@ -167,7 +241,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void concurrentInsertions() throws Throwable {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -201,7 +276,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void concurrentInsertionsAndReads() throws Throwable {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -235,7 +311,10 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void stressConcurrentInsertionsAndReads() throws Throwable {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(4)
+                .concurrencyLevel(1)
+                .build();
         @Cleanup("shutdownNow")
         ExecutorService executor = Executors.newCachedThreadPool();
         final int writeThreads = 16;
@@ -286,7 +365,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testIteration() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .build();
 
         assertEquals(map.keys(), Collections.emptyList());
         assertEquals(map.values(), Collections.emptyList());
@@ -330,7 +410,10 @@ public class ConcurrentLongHashMapTest {
     @Test
     public void testHashConflictWithDeletion() {
         final int Buckets = 16;
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(Buckets, 1);
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(Buckets)
+                .concurrencyLevel(1)
+                .build();
 
         // Pick 2 keys that fall into the same bucket
         long key1 = 1;
@@ -363,7 +446,8 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testPutIfAbsent() {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .build();
         assertNull(map.putIfAbsent(1, "one"));
         assertEquals(map.get(1), "one");
 
@@ -373,7 +457,10 @@ public class ConcurrentLongHashMapTest {
 
     @Test
     public void testComputeIfAbsent() {
-        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1);
+        ConcurrentLongHashMap<Integer> map = ConcurrentLongHashMap.<Integer>newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
         AtomicInteger counter = new AtomicInteger();
         LongFunction<Integer> provider = key -> counter.getAndIncrement();
 
@@ -395,7 +482,10 @@ public class ConcurrentLongHashMapTest {
     static final int N = 100_000;
 
     public void benchConcurrentLongHashMap() throws Exception {
-        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
+        ConcurrentLongHashMap<String> map = ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(N)
+                .concurrencyLevel(1)
+                .build();
 
         for (long i = 0; i < Iterations; i++) {
             for (int j = 0; j < N; j++) {