You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 14:11:14 UTC

[pulsar] 03/15: Optimize memory usage: support to shrink for pendingAcks map (#14515)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2cbf9f4f0ddfff09568b302dadc157a45a1f90ba
Author: lin chen <15...@qq.com>
AuthorDate: Sun Mar 6 04:58:14 2022 +0800

    Optimize memory usage: support to  shrink for pendingAcks map (#14515)
    
    (cherry picked from commit e747b8f16b0b660231ff27a8c2100d67ad7c79a6)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   8 +
 .../org/apache/pulsar/broker/service/Consumer.java |  11 +-
 .../collections/ConcurrentLongLongPairHashMap.java | 673 +++++++++++++++++++++
 .../ConcurrentLongLongPairHashMapTest.java         | 427 +++++++++++++
 4 files changed, 1116 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 97acfd6a13d..f9ccad6a3a3 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -561,6 +561,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private boolean isAllowAutoUpdateSchemaEnabled = true;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Whether to enable the automatic shrink of pendingAcks map, "
+                    + "the default is false, which means it is not enabled. "
+                    + "When there are a large number of share or key share consumers in the cluster, "
+                    + "it can be enabled to reduce the memory consumption caused by pendingAcks.")
+    private boolean autoShrinkForConsumerPendingAcksMap = false;
+
     @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 57aa101c719..4be5cb641d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -36,8 +36,6 @@ import java.util.stream.Collectors;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
-import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -58,6 +56,8 @@ import org.apache.pulsar.common.stats.Rate;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -179,7 +179,12 @@ public class Consumer {
         stats.metadata = this.metadata;
 
         if (Subscription.isIndividualAckMode(subType)) {
-            this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
+            this.pendingAcks = ConcurrentLongLongPairHashMap.newBuilder()
+                    .autoShrink(subscription.getTopic().getBrokerService()
+                            .getPulsar().getConfiguration().isAutoShrinkForConsumerPendingAcksMap())
+                    .expectedItems(256)
+                    .concurrencyLevel(1)
+                    .build();
         } else {
             // We don't need to keep track of pending acks if the subscription is not shared
             this.pendingAcks = null;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java
new file mode 100644
index 00000000000..eac7268ba67
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMap.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.StampedLock;
+
+/**
+ * Concurrent hash map where both keys and values are composed of pairs of longs.
+ *
+ * <p>(long,long) --&gt; (long,long)
+ *
+ * <p>Provides similar methods as a {@code ConcurrentMap<K,V>} but since it's an open hash map with linear probing,
+ * no node allocations are required to store the keys and values, and no boxing is required.
+ *
+ * <p>Keys <strong>MUST</strong> be &gt;= 0.
+ */
+public class ConcurrentLongLongPairHashMap {
+
+    private static final long EmptyKey = -1L;
+    private static final long DeletedKey = -2L;
+
+    private static final long ValueNotFound = -1L;
+
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private static final float DefaultMapFillFactor = 0.66f;
+    private static final float DefaultMapIdleFactor = 0.15f;
+
+    private static final float DefaultExpandFactor = 2;
+    private static final float DefaultShrinkFactor = 2;
+
+    private static final boolean DefaultAutoShrink = false;
+
+    private final Section[] sections;
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder of ConcurrentLongLongPairHashMap.
+     */
+    public static class Builder {
+        int expectedItems = DefaultExpectedItems;
+        int concurrencyLevel = DefaultConcurrencyLevel;
+        float mapFillFactor = DefaultMapFillFactor;
+        float mapIdleFactor = DefaultMapIdleFactor;
+        float expandFactor = DefaultExpandFactor;
+        float shrinkFactor = DefaultShrinkFactor;
+        boolean autoShrink = DefaultAutoShrink;
+
+        public Builder expectedItems(int expectedItems) {
+            this.expectedItems = expectedItems;
+            return this;
+        }
+
+        public Builder concurrencyLevel(int concurrencyLevel) {
+            this.concurrencyLevel = concurrencyLevel;
+            return this;
+        }
+
+        public Builder mapFillFactor(float mapFillFactor) {
+            this.mapFillFactor = mapFillFactor;
+            return this;
+        }
+
+        public Builder mapIdleFactor(float mapIdleFactor) {
+            this.mapIdleFactor = mapIdleFactor;
+            return this;
+        }
+
+        public Builder expandFactor(float expandFactor) {
+            this.expandFactor = expandFactor;
+            return this;
+        }
+
+        public Builder shrinkFactor(float shrinkFactor) {
+            this.shrinkFactor = shrinkFactor;
+            return this;
+        }
+
+        public Builder autoShrink(boolean autoShrink) {
+            this.autoShrink = autoShrink;
+            return this;
+        }
+
+        public ConcurrentLongLongPairHashMap build() {
+            return new ConcurrentLongLongPairHashMap(expectedItems, concurrencyLevel,
+                    mapFillFactor, mapIdleFactor, autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
+    /**
+     * A BiConsumer Long pair.
+     */
+    public interface BiConsumerLongPair {
+        void accept(long key1, long key2, long value1, long value2);
+    }
+
+    /**
+     * A Long pair function.
+     */
+    public interface LongLongPairFunction {
+        long apply(long key1, long key2);
+    }
+
+    /**
+     * A Long pair predicate.
+     */
+    public interface LongLongPairPredicate {
+        boolean test(long key1, long key2, long value1, long value2);
+    }
+
+    private ConcurrentLongLongPairHashMap(int expectedItems, int concurrencyLevel,
+                                          float mapFillFactor, float mapIdleFactor,
+                                         boolean autoShrink, float expandFactor, float shrinkFactor) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+        checkArgument(mapFillFactor > 0 && mapFillFactor < 1);
+        checkArgument(mapIdleFactor > 0 && mapIdleFactor < 1);
+        checkArgument(mapFillFactor > mapIdleFactor);
+        checkArgument(expandFactor > 1);
+        checkArgument(shrinkFactor > 1);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / mapFillFactor);
+        this.sections = new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section(perSectionCapacity, mapFillFactor, mapIdleFactor,
+                    autoShrink, expandFactor, shrinkFactor);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
+    /**
+     * @param key1
+     * @param key2
+     * @return the value or -1 if the key was not present.
+     */
+    public LongPair get(long key1, long key2) {
+        checkBiggerEqualZero(key1);
+        long h = hash(key1, key2);
+        return getSection(h).get(key1, key2, (int) h);
+    }
+
+    public boolean containsKey(long key1, long key2) {
+        return get(key1, key2) != null;
+    }
+
+    public boolean put(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).put(key1, key2, value1, value2, (int) h, false);
+    }
+
+    public boolean putIfAbsent(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).put(key1, key2, value1, value2, (int) h, true);
+    }
+
+    /**
+     * Remove an existing entry if found.
+     *
+     * @param key1
+     * @param key2
+     * @return the value associated with the key or -1 if key was not present.
+     */
+    public boolean remove(long key1, long key2) {
+        checkBiggerEqualZero(key1);
+        long h = hash(key1, key2);
+        return getSection(h).remove(key1, key2, ValueNotFound, ValueNotFound, (int) h);
+    }
+
+    public boolean remove(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).remove(key1, key2, value1, value2, (int) h);
+    }
+
+    private Section getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(BiConsumerLongPair processor) {
+        for (Section s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy).
+     */
+    public List<LongPair> keys() {
+        List<LongPair> keys = Lists.newArrayListWithExpectedSize((int) size());
+        forEach((key1, key2, value1, value2) -> keys.add(new LongPair(key1, key2)));
+        return keys;
+    }
+
+    public List<LongPair> values() {
+        List<LongPair> values = Lists.newArrayListWithExpectedSize((int) size());
+        forEach((key1, key2, value1, value2) -> values.add(new LongPair(value1, value2)));
+        return values;
+    }
+
+    public Map<LongPair, LongPair> asMap() {
+        Map<LongPair, LongPair> map = Maps.newHashMapWithExpectedSize((int) size());
+        forEach((key1, key2, value1, value2) -> map.put(new LongPair(key1, key2), new LongPair(value1, value2)));
+        return map;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section extends StampedLock {
+        // Keys and values are stored interleaved in the table array
+        private volatile long[] table;
+
+        private volatile int capacity;
+        private final int initCapacity;
+        private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
+                AtomicIntegerFieldUpdater.newUpdater(Section.class, "size");
+
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThresholdUp;
+        private int resizeThresholdBelow;
+        private final float mapFillFactor;
+        private final float mapIdleFactor;
+        private final float expandFactor;
+        private final float shrinkFactor;
+        private final boolean autoShrink;
+
+        Section(int capacity, float mapFillFactor, float mapIdleFactor, boolean autoShrink,
+                float expandFactor, float shrinkFactor) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.initCapacity = this.capacity;
+            this.table = new long[4 * this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.autoShrink = autoShrink;
+            this.mapFillFactor = mapFillFactor;
+            this.mapIdleFactor = mapIdleFactor;
+            this.expandFactor = expandFactor;
+            this.shrinkFactor = shrinkFactor;
+            this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
+            this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
+            Arrays.fill(table, EmptyKey);
+        }
+
+        LongPair get(long key1, long key2, int keyHash) {
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    // First try optimistic locking
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (key1 == storedKey1 && key2 == storedKey2) {
+                            return new LongPair(storedValue1, storedValue2);
+                        } else if (storedKey1 == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            bucket = signSafeMod(keyHash, capacity);
+                            storedKey1 = table[bucket];
+                            storedKey2 = table[bucket + 1];
+                            storedValue1 = table[bucket + 2];
+                            storedValue2 = table[bucket + 3];
+                        }
+
+                        if (key1 == storedKey1 && key2 == storedKey2) {
+                            return new LongPair(storedValue1, storedValue2);
+                        } else if (storedKey1 == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        boolean put(long key1, long key2, long value1, long value2, int keyHash, boolean onlyIfAbsent) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+
+                    if (key1 == storedKey1 && key2 == storedKey2) {
+                        if (!onlyIfAbsent) {
+                            // Over written an old value for same key
+                            table[bucket + 2] = value1;
+                            table[bucket + 3] = value2;
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } else if (storedKey1 == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        table[bucket] = key1;
+                        table[bucket + 1] = key2;
+                        table[bucket + 2] = value1;
+                        table[bucket + 3] = value2;
+                        SIZE_UPDATER.incrementAndGet(this);
+                        return true;
+                    } else if (storedKey1 == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThresholdUp) {
+                    try {
+                        // Expand the hashmap
+                        int newCapacity = alignToPowerOfTwo((int) (capacity * expandFactor));
+                        rehash(newCapacity);
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private boolean remove(long key1, long key2, long value1, long value2, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+                    if (key1 == storedKey1 && key2 == storedKey2) {
+                        if (value1 == ValueNotFound || (value1 == storedValue1 && value2 == storedValue2)) {
+                            SIZE_UPDATER.decrementAndGet(this);
+
+                            cleanBucket(bucket);
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } else if (storedKey1 == EmptyKey) {
+                        // Key wasn't found
+                        return false;
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+
+            } finally {
+                if (autoShrink && size < resizeThresholdBelow) {
+                    try {
+                        int newCapacity = alignToPowerOfTwo((int) (capacity / shrinkFactor));
+                        int newResizeThresholdUp = (int) (newCapacity * mapFillFactor);
+                        if (newCapacity < capacity && newResizeThresholdUp > size) {
+                            // shrink the hashmap
+                            rehash(newCapacity);
+                        }
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private void cleanBucket(int bucket) {
+            int nextInArray = (bucket + 4) & (table.length - 1);
+            if (table[nextInArray] == EmptyKey) {
+                table[bucket] = EmptyKey;
+                table[bucket + 1] = EmptyKey;
+                table[bucket + 2] = ValueNotFound;
+                table[bucket + 3] = ValueNotFound;
+                --usedBuckets;
+
+                // Cleanup all the buckets that were in `DeletedKey` state, so that we can reduce unnecessary expansions
+                bucket = (bucket - 4) & (table.length - 1);
+                while (table[bucket] == DeletedKey) {
+                    table[bucket] = EmptyKey;
+                    table[bucket + 1] = EmptyKey;
+                    table[bucket + 2] = ValueNotFound;
+                    table[bucket + 3] = ValueNotFound;
+                    --usedBuckets;
+
+                    bucket = (bucket - 4) & (table.length - 1);
+                }
+            } else {
+                table[bucket] = DeletedKey;
+                table[bucket + 1] = DeletedKey;
+                table[bucket + 2] = ValueNotFound;
+                table[bucket + 3] = ValueNotFound;
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(table, EmptyKey);
+                this.size = 0;
+                this.usedBuckets = 0;
+                if (autoShrink) {
+                    rehash(initCapacity);
+                }
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(BiConsumerLongPair processor) {
+            long stamp = tryOptimisticRead();
+
+            long[] table = this.table;
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+                    table = this.table;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 4) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedKey1 = table[bucket];
+                        storedKey2 = table[bucket + 1];
+                        storedValue1 = table[bucket + 2];
+                        storedValue2 = table[bucket + 3];
+                    }
+
+                    if (storedKey1 != DeletedKey && storedKey1 != EmptyKey) {
+                        processor.accept(storedKey1, storedKey2, storedValue1, storedValue2);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash(int newCapacity) {
+            long[] newTable = new long[4 * newCapacity];
+            Arrays.fill(newTable, EmptyKey);
+
+            // Re-hash table
+            for (int i = 0; i < table.length; i += 4) {
+                long storedKey1 = table[i];
+                long storedKey2 = table[i + 1];
+                long storedValue1 = table[i + 2];
+                long storedValue2 = table[i + 3];
+                if (storedKey1 != EmptyKey && storedKey1 != DeletedKey) {
+                    insertKeyValueNoLock(newTable, newCapacity, storedKey1, storedKey2, storedValue1, storedValue2);
+                }
+            }
+
+            table = newTable;
+            usedBuckets = size;
+            // Capacity needs to be updated after the values, so that we won't see
+            // a capacity value bigger than the actual array size
+            capacity = newCapacity;
+            resizeThresholdUp = (int) (capacity * mapFillFactor);
+            resizeThresholdBelow = (int) (capacity * mapIdleFactor);
+        }
+
+        private static void insertKeyValueNoLock(long[] table, int capacity, long key1, long key2, long value1,
+                long value2) {
+            int bucket = signSafeMod(hash(key1, key2), capacity);
+
+            while (true) {
+                long storedKey1 = table[bucket];
+
+                if (storedKey1 == EmptyKey) {
+                    // The bucket is empty, so we can use it
+                    table[bucket] = key1;
+                    table[bucket + 1] = key2;
+                    table[bucket + 2] = value1;
+                    table[bucket + 3] = value2;
+                    return;
+                }
+
+                bucket = (bucket + 4) & (table.length - 1);
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995L;
+    private static final int R = 47;
+
+    static final long hash(long key1, long key2) {
+        long hash = key1 * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        hash += 31 + (key2 * HashMixer);
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int max) {
+        return (int) (n & (max - 1)) << 2;
+    }
+
+    private static int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+
+    private static void checkBiggerEqualZero(long n) {
+        if (n < 0L) {
+            throw new IllegalArgumentException("Keys and values must be >= 0");
+        }
+    }
+
+    /**
+     * A pair of long values.
+     */
+    public static class LongPair implements Comparable<LongPair> {
+        public final long first;
+        public final long second;
+
+        public LongPair(long first, long second) {
+            this.first = first;
+            this.second = second;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof LongPair) {
+                LongPair other = (LongPair) obj;
+                return first == other.first && second == other.second;
+            }
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return (int) hash(first, second);
+        }
+
+        @Override
+        public int compareTo(LongPair o) {
+            if (first != o.first) {
+                return Long.compare(first, o.first);
+            } else {
+                return Long.compare(second, o.second);
+            }
+        }
+    }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMapTest.java
new file mode 100644
index 00000000000..98a96804d25
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongLongPairHashMapTest.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.junit.Test;
+
+/**
+ * Test the concurrent long-long pair hashmap class.
+ */
+public class ConcurrentLongLongPairHashMapTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+             ConcurrentLongLongPairHashMap.newBuilder()
+                    .expectedItems(0)
+                    .build();
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            ConcurrentLongLongPairHashMap.newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(0)
+                    .build();
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            ConcurrentLongLongPairHashMap.newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(8)
+                    .build();
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .build();
+        assertTrue(map.isEmpty());
+        assertTrue(map.put(1, 1, 11, 11));
+        assertFalse(map.isEmpty());
+
+        assertTrue(map.put(2, 2, 22, 22));
+        assertTrue(map.put(3, 3, 33, 33));
+
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+        assertEquals(map.size(), 3);
+
+        assertTrue(map.remove(1, 1));
+        assertEquals(map.size(), 2);
+        assertEquals(map.get(1, 1), null);
+        assertEquals(map.get(5, 5), null);
+        assertEquals(map.size(), 2);
+
+        assertTrue(map.put(1, 1, 11, 11));
+        assertEquals(map.size(), 3);
+        assertTrue(map.put(1, 1, 111, 111));
+        assertEquals(map.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap
+                .newBuilder()
+                .build();
+
+        assertTrue(map.isEmpty());
+        assertTrue(map.put(1, 1, 11, 11));
+        assertFalse(map.isEmpty());
+
+        assertFalse(map.remove(0, 0));
+        assertFalse(map.remove(1, 1, 111, 111));
+
+        assertFalse(map.isEmpty());
+        assertTrue(map.remove(1, 1, 11, 11));
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testClear() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.capacity() == 4);
+
+        assertTrue(map.put(1, 1, 11, 11));
+        assertTrue(map.put(2, 2, 22, 22));
+        assertTrue(map.put(3, 3, 33, 33));
+
+        assertTrue(map.capacity() == 8);
+        map.clear();
+        assertTrue(map.capacity() == 4);
+    }
+
+    @Test
+    public void testExpandAndShrink() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertTrue(map.put(1, 1, 11, 11));
+        assertTrue(map.put(2, 2, 22, 22));
+        assertTrue(map.put(3, 3, 33, 33));
+
+        // expand hashmap
+        assertTrue(map.capacity() == 8);
+
+        assertTrue(map.remove(1, 1, 11, 11));
+        // not shrink
+        assertTrue(map.capacity() == 8);
+        assertTrue(map.remove(2, 2, 22, 22));
+        // shrink hashmap
+        assertTrue(map.capacity() == 4);
+
+        // expand hashmap
+        assertTrue(map.put(4, 4, 44, 44));
+        assertTrue(map.put(5, 5, 55, 55));
+        assertTrue(map.capacity() == 8);
+
+        //verify that the map does not keep shrinking at every remove() operation
+        assertTrue(map.put(6, 6, 66, 66));
+        assertTrue(map.remove(6, 6, 66, 66));
+        assertTrue(map.capacity() == 8);
+    }
+
+    @Test
+    public void testNegativeUsedBucketCount() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
+
+        map.put(0, 0, 0, 0);
+        assertEquals(1, map.getUsedBucketCount());
+        map.put(0, 0, 1, 1);
+        assertEquals(1, map.getUsedBucketCount());
+        map.remove(0, 0);
+        assertEquals(0, map.getUsedBucketCount());
+        map.remove(0, 0);
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            map.put(i, i, i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(n / 2)
+                .concurrencyLevel(1)
+                .build();
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            map.put(i, i, i, i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            map.remove(i, i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            map.put(i, i, i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int n = 100_000;
+        long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < n; j++) {
+                    long key1 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key1 -= key1 % (threadIdx + 1);
+
+                    long key2 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key2 -= key2 % (threadIdx + 1);
+
+                    map.put(key1, key2, value, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), n * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int n = 100_000;
+        final long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < n; j++) {
+                    long key1 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key1 -= key1 % (threadIdx + 1);
+
+                    long key2 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key2 -= key2 % (threadIdx + 1);
+
+                    map.put(key1, key2, value, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), n * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0, 0, 0);
+
+        assertEquals(map.keys(), Lists.newArrayList(new LongPair(0, 0)));
+        assertEquals(map.values(), Lists.newArrayList(new LongPair(0, 0)));
+
+        map.remove(0, 0);
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0, 0, 0);
+        map.put(1, 1, 11, 11);
+        map.put(2, 2, 22, 22);
+
+        List<LongPair> keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(new LongPair(0, 0), new LongPair(1, 1), new LongPair(2, 2)));
+
+        List<LongPair> values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new LongPair(11, 11), new LongPair(22, 22)));
+
+        map.put(1, 1, 111, 111);
+
+        keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(new LongPair(0, 0), new LongPair(1, 1), new LongPair(2, 2)));
+
+        values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new LongPair(22, 22), new LongPair(111, 111)));
+
+        map.clear();
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testPutIfAbsent() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .build();
+
+        assertTrue(map.putIfAbsent(1, 1, 11, 11));
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+
+        assertFalse(map.putIfAbsent(1, 1, 111, 111));
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+    }
+
+    @Test
+    public void testIvalidKeys() {
+        ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
+
+
+        try {
+            map.put(-5, 3, 4, 4);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.get(-1, 0);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.containsKey(-1, 0);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.putIfAbsent(-1, 1, 1, 1);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void testAsMap() {
+        ConcurrentLongLongPairHashMap lmap = ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(16)
+                .concurrencyLevel(1)
+                .build();
+        lmap.put(1, 1, 11, 11);
+        lmap.put(2, 2, 22, 22);
+        lmap.put(3, 3, 33, 33);
+
+        Map<LongPair, LongPair> map = new HashMap<>();
+        map.put(new LongPair(1, 1), new LongPair(11, 11));
+        map.put(new LongPair(2, 2), new LongPair(22, 22));
+        map.put(new LongPair(3, 3), new LongPair(33, 33));
+
+        assertEquals(map, lmap.asMap());
+    }
+}