You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/11/29 23:18:02 UTC

[3/3] bookkeeper git commit: BOOKKEEPER-964: Add concurrent maps and sets for primitive types

BOOKKEEPER-964: Add concurrent maps and sets for primitive types

In BookKeeper there are many instances of maps and sets that use ledger id
and entry ids as keys or values. JDK concurrent collections have the overhead
of boxing all the primitive values into objects (eg: long --> Long) that would
need to be allocated from the heap. In addition to that, JDK map implementations
are closed hash tables and they will require at least one more allocation to hold
the linked-list/tree node.

There are already available libraries that offer primitive collections with
zero-allocation, but none of these support concurrent maps/sets.

We have added a handful of specializations, all based on the same implementation
idea. We have a hash table which is broken down into multiple sections. Each
sections, on its own, is an open hash table with linear probing, protected by
a stamped lock.

All insertions, lookups and iterations on these collections are allocation free.

```
ConcurrentLongHashMap: Map<long, Object>
ConcurrentLongHashSet: Set<long>
ConcurrentLongLongHashMap: Map<long, long>
ConcurrentLongLongPairHashMap: Map< Pair<long, long>, Pair<long, long> >
ConcurrentOpenHashMap: Map<Object, Object>
ConcurrentOpenHashSet: Set<Object>
```

Author: Matteo Merli <mm...@yahoo-inc.com>

Reviewers: Sijie Guo <si...@apache.org>, Enrico Olivelli <En...@diennea.com>

Closes #72 from merlimat/bk-collections


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/ecbb053e
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/ecbb053e
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/ecbb053e

Branch: refs/heads/master
Commit: ecbb053e6e873859507e247cae727f4bc8b9f7fa
Parents: 4cf0978
Author: Matteo Merli <mm...@yahoo-inc.com>
Authored: Tue Nov 29 15:17:46 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Nov 29 15:17:46 2016 -0800

----------------------------------------------------------------------
 .../util/collections/ConcurrentLongHashMap.java | 494 +++++++++++++
 .../util/collections/ConcurrentLongHashSet.java | 421 +++++++++++
 .../collections/ConcurrentLongLongHashMap.java  | 723 +++++++++++++++++++
 .../ConcurrentLongLongPairHashMap.java          | 550 ++++++++++++++
 .../util/collections/ConcurrentOpenHashMap.java | 493 +++++++++++++
 .../util/collections/ConcurrentOpenHashSet.java | 416 +++++++++++
 .../collections/ConcurrentLongHashMapTest.java  | 435 +++++++++++
 .../collections/ConcurrentLongHashSetTest.java  | 275 +++++++
 .../ConcurrentLongLongHashMapTest.java          | 473 ++++++++++++
 .../ConcurrentLongLongPairHashMapTest.java      | 343 +++++++++
 .../collections/ConcurrentOpenHashMapTest.java  | 488 +++++++++++++
 .../collections/ConcurrentOpenHashSetTest.java  | 318 ++++++++
 12 files changed, 5429 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
new file mode 100644
index 0000000..63603cb
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -0,0 +1,494 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.LongFunction;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Map from long to an Object.
+ * 
+ * Provides similar methods as a ConcurrentMap<long,Object> with 2 differences:
+ * <ol>
+ * <li>No boxing/unboxing from long -> Long
+ * <li>Open hash map with linear probing, no node allocations to store the values
+ * </ol>
+ *
+ * @param <V>
+ */
+@SuppressWarnings("unchecked")
+public class ConcurrentLongHashMap<V> {
+
+    private static final Object EmptyValue = null;
+    private static final Object DeletedValue = new Object();
+
+    private static final float MapFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section<V>[] sections;
+
+    public ConcurrentLongHashMap() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentLongHashMap(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        this.sections = (Section<V>[]) new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section<>(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section<V> s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section<V> s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section<V> s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section<V> s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public V get(long key) {
+        long h = hash(key);
+        return getSection(h).get(key, (int) h);
+    }
+
+    public boolean containsKey(long key) {
+        return get(key) != null;
+    }
+
+    public V put(long key, V value) {
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, false, null);
+    }
+
+    public V putIfAbsent(long key, V value) {
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, true, null);
+    }
+
+    public V computeIfAbsent(long key, LongFunction<V> provider) {
+        checkNotNull(provider);
+        long h = hash(key);
+        return getSection(h).put(key, null, (int) h, true, provider);
+    }
+
+    public V remove(long key) {
+        long h = hash(key);
+        return getSection(h).remove(key, null, (int) h);
+    }
+
+    public boolean remove(long key, Object value) {
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).remove(key, value, (int) h) != null;
+    }
+
+    private Section<V> getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section<V> s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(EntryProcessor<V> processor) {
+        for (Section<V> s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public List<Long> keys() {
+        List<Long> keys = Lists.newArrayListWithExpectedSize((int) size());
+        forEach((key, value) -> keys.add(key));
+        return keys;
+    }
+
+    List<V> values() {
+        List<V> values = Lists.newArrayListWithExpectedSize((int) size());
+        forEach((key, value) -> values.add(value));
+        return values;
+    }
+
+    public static interface EntryProcessor<V> {
+        void accept(long key, V value);
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section<V> extends StampedLock {
+        private long[] keys;
+        private V[] values;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.keys = new long[this.capacity];
+            this.values = (V[]) new Object[this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+        }
+
+        V get(long key, int keyHash) {
+            int bucket = keyHash;
+
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+
+            try {
+                while (true) {
+                    int capacity = this.capacity;
+                    bucket = signSafeMod(bucket, capacity);
+
+                    // First try optimistic locking
+                    long storedKey = keys[bucket];
+                    V storedValue = values[bucket];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (storedKey == key) {
+                            return storedValue != DeletedValue ? storedValue : null;
+                        } else if (storedValue == EmptyValue) {
+                            // Not found
+                            return null;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+                            storedKey = keys[bucket];
+                            storedValue = values[bucket];
+                        }
+
+                        if (capacity != this.capacity) {
+                            // There has been a rehashing. We need to restart the search
+                            bucket = keyHash;
+                            continue;
+                        }
+
+                        if (storedKey == key) {
+                            return storedValue != DeletedValue ? storedValue : null;
+                        } else if (storedValue == EmptyValue) {
+                            // Not found
+                            return null;
+                        }
+                    }
+
+                    ++bucket;
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction<V> valueProvider) {
+            int bucket = keyHash;
+
+            long stamp = writeLock();
+            int capacity = this.capacity;
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    bucket = signSafeMod(bucket, capacity);
+
+                    long storedKey = keys[bucket];
+                    V storedValue = values[bucket];
+
+                    if (storedKey == key) {
+                        if (storedValue == EmptyValue) {
+                            values[bucket] = value != null ? value : valueProvider.apply(key);
+                            ++size;
+                            ++usedBuckets;
+                            return valueProvider != null ? values[bucket] : null;
+                        } else if (storedValue == DeletedValue) {
+                            values[bucket] = value != null ? value : valueProvider.apply(key);
+                            ++size;
+                            return valueProvider != null ? values[bucket] : null;
+                        } else if (!onlyIfAbsent) {
+                            // Over written an old value for same key
+                            values[bucket] = value;
+                            return storedValue;
+                        } else {
+                            return storedValue;
+                        }
+                    } else if (storedValue == EmptyValue) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        keys[bucket] = key;
+                        values[bucket] = value != null ? value : valueProvider.apply(key);
+                        ++size;
+                        return valueProvider != null ? values[bucket] : null;
+                    } else if (storedValue == DeletedValue) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    ++bucket;
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private V remove(long key, Object value, int keyHash) {
+            int bucket = keyHash;
+            long stamp = writeLock();
+
+            try {
+                while (true) {
+                    int capacity = this.capacity;
+                    bucket = signSafeMod(bucket, capacity);
+
+                    long storedKey = keys[bucket];
+                    V storedValue = values[bucket];
+                    if (storedKey == key) {
+                        if (value == null || value.equals(storedValue)) {
+                            if (storedValue == EmptyValue || storedValue == DeletedValue) {
+                                return null;
+                            }
+
+                            --size;
+                            V nextValueInArray = values[signSafeMod(bucket + 1, capacity)];
+                            if (nextValueInArray == EmptyValue) {
+                                values[bucket] = (V) EmptyValue;
+                                --usedBuckets;
+                            } else {
+                                values[bucket] = (V) DeletedValue;
+                            }
+
+                            return storedValue;
+                        } else {
+                            return null;
+                        }
+                    } else if (storedValue == EmptyValue) {
+                        // Key wasn't found
+                        return null;
+                    }
+
+                    ++bucket;
+                }
+
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(keys, 0);
+                Arrays.fill(values, EmptyValue);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(EntryProcessor<V> processor) {
+            long stamp = tryOptimisticRead();
+
+            int capacity = this.capacity;
+            long[] keys = this.keys;
+            V[] values = this.values;
+
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+
+                    capacity = this.capacity;
+                    keys = this.keys;
+                    values = this.values;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < capacity; bucket++) {
+                    long storedKey = keys[bucket];
+                    V storedValue = values[bucket];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedKey = keys[bucket];
+                        storedValue = values[bucket];
+                    }
+
+                    if (storedValue != DeletedValue && storedValue != EmptyValue) {
+                        processor.accept(storedKey, storedValue);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            long[] newKeys = new long[newCapacity];
+            V[] newValues = (V[]) new Object[newCapacity];
+
+            // Re-hash table
+            for (int i = 0; i < keys.length; i++) {
+                long storedKey = keys[i];
+                V storedValue = values[i];
+                if (storedValue != EmptyValue && storedValue != DeletedValue) {
+                    insertKeyValueNoLock(newKeys, newValues, storedKey, storedValue);
+                }
+            }
+
+            capacity = newCapacity;
+            keys = newKeys;
+            values = newValues;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * MapFillFactor);
+        }
+
+        private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
+            int bucket = (int) hash(key);
+
+            while (true) {
+                bucket = signSafeMod(bucket, keys.length);
+
+                V storedValue = values[bucket];
+
+                if (storedValue == EmptyValue) {
+                    // The bucket is empty, so we can use it
+                    keys[bucket] = key;
+                    values[bucket] = value;
+                    return;
+                }
+
+                ++bucket;
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    static final long hash(long key) {
+        long hash = key * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) n & (Max - 1);
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
new file mode 100644
index 0000000..d02b0bc
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
@@ -0,0 +1,421 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.locks.StampedLock;
+
+/**
+ * Concurrent hash set for primitive longs
+ *
+ * Provides similar methods as a ConcurrentSet&lt;Long&gt; but since it's an open hash map with linear probing, no node
+ * allocations are required to store the values.
+ * <p>
+ * Items <strong>MUST</strong> be >= 0.
+ */
+public class ConcurrentLongHashSet {
+
+    private static final long EmptyItem = -1L;
+    private static final long DeletedItem = -2L;
+
+    private static final float SetFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section[] sections;
+
+    public static interface ConsumerLong {
+        void accept(long item);
+    }
+
+    public ConcurrentLongHashSet() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentLongHashSet(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentLongHashSet(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / SetFillFactor);
+        this.sections = new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
+    public boolean contains(long item) {
+        checkBiggerEqualZero(item);
+        long h = hash(item);
+        return getSection(h).contains(item, (int) h);
+    }
+
+    public boolean add(long item) {
+        checkBiggerEqualZero(item);
+        long h = hash(item);
+        return getSection(h).add(item, (int) h);
+    }
+
+    /**
+     * Remove an existing entry if found
+     *
+     * @param item
+     * @return true if removed or false if item was not present
+     */
+    public boolean remove(long item) {
+        checkBiggerEqualZero(item);
+        long h = hash(item);
+        return getSection(h).remove(item, (int) h);
+    }
+
+    private final Section getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(ConsumerLong processor) {
+        for (Section s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public Set<Long> items() {
+        Set<Long> items = new HashSet<>();
+        forEach(items::add);
+        return items;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section extends StampedLock {
+        // Keys and values are stored interleaved in the table array
+        private long[] table;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.table = new long[this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * SetFillFactor);
+            Arrays.fill(table, EmptyItem);
+        }
+
+        boolean contains(long item, int hash) {
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+            int bucket = signSafeMod(hash, capacity);
+
+            try {
+                while (true) {
+                    // First try optimistic locking
+                    long storedItem = table[bucket];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (item == storedItem) {
+                            return true;
+                        } else if (storedItem == EmptyItem) {
+                            // Not found
+                            return false;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            bucket = signSafeMod(hash, capacity);
+                            storedItem = table[bucket];
+                        }
+
+                        if (item == storedItem) {
+                            return true;
+                        } else if (storedItem == EmptyItem) {
+                            // Not found
+                            return false;
+                        }
+                    }
+
+                    bucket = (bucket + 1) & (table.length - 1);
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        boolean add(long item, long hash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(hash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedItem = -1;
+
+            try {
+                while (true) {
+                    long storedItem = table[bucket];
+
+                    if (item == storedItem) {
+                        // Item was already in set
+                        return false;
+                    } else if (storedItem == EmptyItem) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedItem != -1) {
+                            bucket = firstDeletedItem;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        table[bucket] = item;
+                        ++size;
+                        return true;
+                    } else if (storedItem == DeletedItem) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedItem == -1) {
+                            firstDeletedItem = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 1) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private boolean remove(long item, int hash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(hash, capacity);
+
+            try {
+                while (true) {
+                    long storedItem = table[bucket];
+                    if (item == storedItem) {
+                        --size;
+
+                        cleanBucket(bucket);
+                        return true;
+
+                    } else if (storedItem == EmptyItem) {
+                        // Key wasn't found
+                        return false;
+                    }
+
+                    bucket = (bucket + 1) & (table.length - 1);
+                }
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        private void cleanBucket(int bucket) {
+            int nextInArray = (bucket + 1) & (table.length - 1);
+            if (table[nextInArray] == EmptyItem) {
+                table[bucket] = EmptyItem;
+                --usedBuckets;
+            } else {
+                table[bucket] = DeletedItem;
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(table, EmptyItem);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(ConsumerLong processor) {
+            long stamp = tryOptimisticRead();
+
+            long[] table = this.table;
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+                    table = this.table;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket++) {
+                    long storedItem = table[bucket];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedItem = table[bucket];
+                    }
+
+                    if (storedItem != DeletedItem && storedItem != EmptyItem) {
+                        processor.accept(storedItem);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            long[] newTable = new long[newCapacity];
+            Arrays.fill(newTable, EmptyItem);
+
+            // Re-hash table
+            for (int i = 0; i < table.length; i++) {
+                long storedItem = table[i];
+                if (storedItem != EmptyItem && storedItem != DeletedItem) {
+                    insertKeyValueNoLock(newTable, newCapacity, storedItem);
+                }
+            }
+
+            capacity = newCapacity;
+            table = newTable;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * SetFillFactor);
+        }
+
+        private static void insertKeyValueNoLock(long[] table, int capacity, long item) {
+            int bucket = signSafeMod(hash(item), capacity);
+
+            while (true) {
+                long storedKey = table[bucket];
+
+                if (storedKey == EmptyItem) {
+                    // The bucket is empty, so we can use it
+                    table[bucket] = item;
+                    return;
+                }
+
+                bucket = (bucket + 1) & (table.length - 1);
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    final static long hash(long key) {
+        long hash = key * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) (n & (Max - 1));
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+
+    private static final void checkBiggerEqualZero(long n) {
+        if (n < 0L) {
+            throw new IllegalArgumentException("Keys and values must be >= 0");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
new file mode 100644
index 0000000..7b5b5c2
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
@@ -0,0 +1,723 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.LongPredicate;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Concurrent hash map from primitive long to long
+ *
+ * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
+ * allocations are required to store the values.
+ * <p>
+ * Keys <strong>MUST</strong> be >= 0.
+ */
+public class ConcurrentLongLongHashMap {
+
+    private static final long EmptyKey = -1L;
+    private static final long DeletedKey = -2L;
+
+    private static final long ValueNotFound = -1L;
+
+    private static final float MapFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section[] sections;
+
+    public static interface BiConsumerLong {
+        void accept(long key, long value);
+    }
+
+    public static interface LongLongFunction {
+        long apply(long key);
+    }
+
+    public static interface LongLongPredicate {
+        boolean test(long key, long value);
+    }
+
+    public ConcurrentLongLongHashMap() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentLongLongHashMap(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentLongLongHashMap(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        this.sections = new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
+    /**
+     *
+     * @param key
+     * @return the value or -1 if the key was not present
+     */
+    public long get(long key) {
+        checkBiggerEqualZero(key);
+        long h = hash(key);
+        return getSection(h).get(key, (int) h);
+    }
+
+    public boolean containsKey(long key) {
+        return get(key) != ValueNotFound;
+    }
+
+    public long put(long key, long value) {
+        checkBiggerEqualZero(key);
+        checkBiggerEqualZero(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, false, null);
+    }
+
+    public long putIfAbsent(long key, long value) {
+        checkBiggerEqualZero(key);
+        checkBiggerEqualZero(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, true, null);
+    }
+
+    public long computeIfAbsent(long key, LongLongFunction provider) {
+        checkBiggerEqualZero(key);
+        checkNotNull(provider);
+        long h = hash(key);
+        return getSection(h).put(key, ValueNotFound, (int) h, true, provider);
+    }
+
+    /**
+     * Atomically add the specified delta to a current value identified by the key. If the entry was not in the map, a
+     * new entry with default value 0 is added and then the delta is added.
+     *
+     * @param key
+     *            the entry key
+     * @param delta
+     *            the delta to add
+     * @return the new value of the entry
+     * @throws IllegalArgumentException
+     *             if the delta was invalid, such as it would have caused the value to be < 0
+     */
+    public long addAndGet(long key, long delta) {
+        checkBiggerEqualZero(key);
+        long h = hash(key);
+        return getSection(h).addAndGet(key, delta, (int) h);
+    }
+
+    /**
+     * Change the value for a specific key only if it matches the current value.
+     *
+     * @param key
+     * @param currentValue
+     * @param newValue
+     * @return
+     */
+    public boolean compareAndSet(long key, long currentValue, long newValue) {
+        checkBiggerEqualZero(key);
+        checkBiggerEqualZero(newValue);
+        long h = hash(key);
+        return getSection(h).compareAndSet(key, currentValue, newValue, (int) h);
+    }
+
+    /**
+     * Remove an existing entry if found
+     *
+     * @param key
+     * @return the value associated with the key or -1 if key was not present
+     */
+    public long remove(long key) {
+        checkBiggerEqualZero(key);
+        long h = hash(key);
+        return getSection(h).remove(key, ValueNotFound, (int) h);
+    }
+
+    public boolean remove(long key, long value) {
+        checkBiggerEqualZero(key);
+        checkBiggerEqualZero(value);
+        long h = hash(key);
+        return getSection(h).remove(key, value, (int) h) != ValueNotFound;
+    }
+
+    public int removeIf(LongPredicate filter) {
+        checkNotNull(filter);
+
+        int removedCount = 0;
+        for (Section s : sections) {
+            removedCount += s.removeIf(filter);
+        }
+
+        return removedCount;
+    }
+
+    public int removeIf(LongLongPredicate filter) {
+        checkNotNull(filter);
+
+        int removedCount = 0;
+        for (Section s : sections) {
+            removedCount += s.removeIf(filter);
+        }
+
+        return removedCount;
+    }
+
+    private final Section getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(BiConsumerLong processor) {
+        for (Section s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public List<Long> keys() {
+        List<Long> keys = Lists.newArrayList();
+        forEach((key, value) -> keys.add(key));
+        return keys;
+    }
+
+    public List<Long> values() {
+        List<Long> values = Lists.newArrayList();
+        forEach((key, value) -> values.add(value));
+        return values;
+    }
+
+    public Map<Long, Long> asMap() {
+        Map<Long, Long> map = Maps.newHashMap();
+        forEach((key, value) -> map.put(key, value));
+        return map;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section extends StampedLock {
+        // Keys and values are stored interleaved in the table array
+        private long[] table;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.table = new long[2 * this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            Arrays.fill(table, EmptyKey);
+        }
+
+        long get(long key, int keyHash) {
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    // First try optimistic locking
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (key == storedKey) {
+                            return storedValue;
+                        } else if (storedKey == EmptyKey) {
+                            // Not found
+                            return ValueNotFound;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            bucket = signSafeMod(keyHash, capacity);
+                            storedKey = table[bucket];
+                            storedValue = table[bucket + 1];
+                        }
+
+                        if (key == storedKey) {
+                            return storedValue;
+                        } else if (storedKey == EmptyKey) {
+                            // Not found
+                            return ValueNotFound;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        long put(long key, long value, int keyHash, boolean onlyIfAbsent, LongLongFunction valueProvider) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (key == storedKey) {
+                        if (!onlyIfAbsent) {
+                            // Over written an old value for same key
+                            table[bucket + 1] = value;
+                            return storedValue;
+                        } else {
+                            return storedValue;
+                        }
+                    } else if (storedKey == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        if (value == ValueNotFound) {
+                            value = valueProvider.apply(key);
+                        }
+
+                        table[bucket] = key;
+                        table[bucket + 1] = value;
+                        ++size;
+                        return valueProvider != null ? value : ValueNotFound;
+                    } else if (storedKey == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        long addAndGet(long key, long delta, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (key == storedKey) {
+                        // Over written an old value for same key
+                        long newValue = storedValue + delta;
+                        checkBiggerEqualZero(newValue);
+
+                        table[bucket + 1] = newValue;
+                        return newValue;
+                    } else if (storedKey == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        checkBiggerEqualZero(delta);
+
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        table[bucket] = key;
+                        table[bucket + 1] = delta;
+                        ++size;
+                        return delta;
+                    } else if (storedKey == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        boolean compareAndSet(long key, long currentValue, long newValue, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (key == storedKey) {
+                        if (storedValue != currentValue) {
+                            return false;
+                        }
+
+                        // Over write an old value for same key
+                        table[bucket + 1] = newValue;
+                        return true;
+                    } else if (storedKey == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map.
+                        if (currentValue == -1) {
+                            if (firstDeletedKey != -1) {
+                                bucket = firstDeletedKey;
+                            } else {
+                                ++usedBuckets;
+                            }
+
+                            table[bucket] = key;
+                            table[bucket + 1] = newValue;
+                            ++size;
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } else if (storedKey == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private long remove(long key, long value, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+                    if (key == storedKey) {
+                        if (value == ValueNotFound || value == storedValue) {
+                            --size;
+
+                            cleanBucket(bucket);
+                            return storedValue;
+                        } else {
+                            return ValueNotFound;
+                        }
+                    } else if (storedKey == EmptyKey) {
+                        // Key wasn't found
+                        return ValueNotFound;
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        int removeIf(LongPredicate filter) {
+            long stamp = writeLock();
+
+            int removedCount = 0;
+            try {
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    long storedKey = table[bucket];
+
+                    if (storedKey != DeletedKey && storedKey != EmptyKey) {
+                        if (filter.test(storedKey)) {
+                            // Removing item
+                            --size;
+                            ++removedCount;
+                            cleanBucket(bucket);
+                        }
+                    }
+                }
+
+                return removedCount;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        int removeIf(LongLongPredicate filter) {
+            long stamp = writeLock();
+
+            int removedCount = 0;
+            try {
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (storedKey != DeletedKey && storedKey != EmptyKey) {
+                        if (filter.test(storedKey, storedValue)) {
+                            // Removing item
+                            --size;
+                            ++removedCount;
+                            cleanBucket(bucket);
+                        }
+                    }
+                }
+
+                return removedCount;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        private void cleanBucket(int bucket) {
+            int nextInArray = (bucket + 2) & (table.length - 1);
+            if (table[nextInArray] == EmptyKey) {
+                table[bucket] = EmptyKey;
+                table[bucket + 1] = ValueNotFound;
+                --usedBuckets;
+            } else {
+                table[bucket] = DeletedKey;
+                table[bucket + 1] = ValueNotFound;
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(table, EmptyKey);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(BiConsumerLong processor) {
+            long stamp = tryOptimisticRead();
+
+            long[] table = this.table;
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+                    table = this.table;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedKey = table[bucket];
+                        storedValue = table[bucket + 1];
+                    }
+
+                    if (storedKey != DeletedKey && storedKey != EmptyKey) {
+                        processor.accept(storedKey, storedValue);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            long[] newTable = new long[2 * newCapacity];
+            Arrays.fill(newTable, EmptyKey);
+
+            // Re-hash table
+            for (int i = 0; i < table.length; i += 2) {
+                long storedKey = table[i];
+                long storedValue = table[i + 1];
+                if (storedKey != EmptyKey && storedKey != DeletedKey) {
+                    insertKeyValueNoLock(newTable, newCapacity, storedKey, storedValue);
+                }
+            }
+
+            capacity = newCapacity;
+            table = newTable;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * MapFillFactor);
+        }
+
+        private static void insertKeyValueNoLock(long[] table, int capacity, long key, long value) {
+            int bucket = signSafeMod(hash(key), capacity);
+
+            while (true) {
+                long storedKey = table[bucket];
+
+                if (storedKey == EmptyKey) {
+                    // The bucket is empty, so we can use it
+                    table[bucket] = key;
+                    table[bucket + 1] = value;
+                    return;
+                }
+
+                bucket = (bucket + 2) & (table.length - 1);
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    final static long hash(long key) {
+        long hash = key * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) (n & (Max - 1)) << 1;
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+
+    private static final void checkBiggerEqualZero(long n) {
+        if (n < 0L) {
+            throw new IllegalArgumentException("Keys and values must be >= 0");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
new file mode 100644
index 0000000..7677735
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
@@ -0,0 +1,550 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.StampedLock;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Concurrent hash map where both keys and values are composed of pairs of longs.
+ * <p>
+ * (long,long) --&gt; (long,long)
+ * <p>
+ * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
+ * allocations are required to store the keys and values, and no boxing is required.
+ * <p>
+ * Keys <strong>MUST</strong> be >= 0.
+ */
+public class ConcurrentLongLongPairHashMap {
+
+    private static final long EmptyKey = -1L;
+    private static final long DeletedKey = -2L;
+
+    private static final long ValueNotFound = -1L;
+
+    private static final float MapFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section[] sections;
+
+    public static interface BiConsumerLongPair {
+        void accept(long key1, long key2, long value1, long value2);
+    }
+
+    public static interface LongLongPairFunction {
+        long apply(long key1, long key2);
+    }
+
+    public static interface LongLongPairPredicate {
+        boolean test(long key1, long key2, long value1, long value2);
+    }
+
+    public ConcurrentLongLongPairHashMap() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentLongLongPairHashMap(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentLongLongPairHashMap(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        this.sections = new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
+    /**
+     *
+     * @param key
+     * @return the value or -1 if the key was not present
+     */
+    public LongPair get(long key1, long key2) {
+        checkBiggerEqualZero(key1);
+        long h = hash(key1, key2);
+        return getSection(h).get(key1, key2, (int) h);
+    }
+
+    public boolean containsKey(long key1, long key2) {
+        return get(key1, key2) != null;
+    }
+
+    public boolean put(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).put(key1, key2, value1, value2, (int) h, false);
+    }
+
+    public boolean putIfAbsent(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).put(key1, key2, value1, value2, (int) h, true);
+    }
+
+    /**
+     * Remove an existing entry if found
+     *
+     * @param key
+     * @return the value associated with the key or -1 if key was not present
+     */
+    public boolean remove(long key1, long key2) {
+        checkBiggerEqualZero(key1);
+        long h = hash(key1, key2);
+        return getSection(h).remove(key1, key2, ValueNotFound, ValueNotFound, (int) h);
+    }
+
+    public boolean remove(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).remove(key1, key2, value1, value2, (int) h);
+    }
+
+    private final Section getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(BiConsumerLongPair processor) {
+        for (Section s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public List<LongPair> keys() {
+        List<LongPair> keys = Lists.newArrayList();
+        forEach((key1, key2, value1, value2) -> keys.add(new LongPair(key1, key2)));
+        return keys;
+    }
+
+    public List<LongPair> values() {
+        List<LongPair> values = Lists.newArrayList();
+        forEach((key1, key2, value1, value2) -> values.add(new LongPair(value1, value2)));
+        return values;
+    }
+
+    public Map<LongPair, LongPair> asMap() {
+        Map<LongPair, LongPair> map = Maps.newHashMap();
+        forEach((key1, key2, value1, value2) -> map.put(new LongPair(key1, key2), new LongPair(value1, value2)));
+        return map;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section extends StampedLock {
+        // Keys and values are stored interleaved in the table array
+        private long[] table;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.table = new long[4 * this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            Arrays.fill(table, EmptyKey);
+        }
+
+        LongPair get(long key1, long key2, int keyHash) {
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    // First try optimistic locking
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (key1 == storedKey1 && key2 == storedKey2) {
+                            return new LongPair(storedValue1, storedValue2);
+                        } else if (storedKey1 == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            bucket = signSafeMod(keyHash, capacity);
+                            storedKey1 = table[bucket];
+                            storedKey2 = table[bucket + 1];
+                            storedValue1 = table[bucket + 2];
+                            storedValue2 = table[bucket + 3];
+                        }
+
+                        if (key1 == storedKey1 && key2 == storedKey2) {
+                            return new LongPair(storedValue1, storedValue2);
+                        } else if (storedKey1 == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        boolean put(long key1, long key2, long value1, long value2, int keyHash, boolean onlyIfAbsent) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+
+                    if (key1 == storedKey1 && key2 == storedKey2) {
+                        if (!onlyIfAbsent) {
+                            // Over written an old value for same key
+                            table[bucket + 2] = value1;
+                            table[bucket + 3] = value2;
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } else if (storedKey1 == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        table[bucket] = key1;
+                        table[bucket + 1] = key2;
+                        table[bucket + 2] = value1;
+                        table[bucket + 3] = value2;
+                        ++size;
+                        return true;
+                    } else if (storedKey1 == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private boolean remove(long key1, long key2, long value1, long value2, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+                    if (key1 == storedKey1 && key2 == storedKey2) {
+                        if (value1 == ValueNotFound || (value1 == storedValue1 && value2 == storedValue2)) {
+                            --size;
+
+                            cleanBucket(bucket);
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } else if (storedKey1 == EmptyKey) {
+                        // Key wasn't found
+                        return false;
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        private void cleanBucket(int bucket) {
+            int nextInArray = (bucket + 4) & (table.length - 1);
+            if (table[nextInArray] == EmptyKey) {
+                table[bucket] = EmptyKey;
+                table[bucket + 1] = EmptyKey;
+                table[bucket + 2] = ValueNotFound;
+                table[bucket + 3] = ValueNotFound;
+                --usedBuckets;
+            } else {
+                table[bucket] = DeletedKey;
+                table[bucket + 1] = DeletedKey;
+                table[bucket + 2] = ValueNotFound;
+                table[bucket + 3] = ValueNotFound;
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(table, EmptyKey);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(BiConsumerLongPair processor) {
+            long stamp = tryOptimisticRead();
+
+            long[] table = this.table;
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+                    table = this.table;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 4) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedKey1 = table[bucket];
+                        storedKey2 = table[bucket + 1];
+                        storedValue1 = table[bucket + 2];
+                        storedValue2 = table[bucket + 3];
+                    }
+
+                    if (storedKey1 != DeletedKey && storedKey1 != EmptyKey) {
+                        processor.accept(storedKey1, storedKey2, storedValue1, storedValue2);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            long[] newTable = new long[4 * newCapacity];
+            Arrays.fill(newTable, EmptyKey);
+
+            // Re-hash table
+            for (int i = 0; i < table.length; i += 4) {
+                long storedKey1 = table[i];
+                long storedKey2 = table[i + 1];
+                long storedValue1 = table[i + 2];
+                long storedValue2 = table[i + 3];
+                if (storedKey1 != EmptyKey && storedKey1 != DeletedKey) {
+                    insertKeyValueNoLock(newTable, newCapacity, storedKey1, storedKey2, storedValue1, storedValue2);
+                }
+            }
+
+            capacity = newCapacity;
+            table = newTable;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * MapFillFactor);
+        }
+
+        private static void insertKeyValueNoLock(long[] table, int capacity, long key1, long key2, long value1,
+                long value2) {
+            int bucket = signSafeMod(hash(key1, key2), capacity);
+
+            while (true) {
+                long storedKey1 = table[bucket];
+
+                if (storedKey1 == EmptyKey) {
+                    // The bucket is empty, so we can use it
+                    table[bucket] = key1;
+                    table[bucket + 1] = key2;
+                    table[bucket + 2] = value1;
+                    table[bucket + 3] = value2;
+                    return;
+                }
+
+                bucket = (bucket + 4) & (table.length - 1);
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    final static long hash(long key1, long key2) {
+        long hash = key1 * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        hash += 31 + (key2 * HashMixer);
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) (n & (Max - 1)) << 2;
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+
+    private static final void checkBiggerEqualZero(long n) {
+        if (n < 0L) {
+            throw new IllegalArgumentException("Keys and values must be >= 0");
+        }
+    }
+
+    public static class LongPair implements Comparable<LongPair> {
+        public final long first;
+        public final long second;
+
+        public LongPair(long first, long second) {
+            this.first = first;
+            this.second = second;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof LongPair) {
+                LongPair other = (LongPair) obj;
+                return first == other.first && second == other.second;
+            }
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return (int) hash(first, second);
+        }
+
+        @Override
+        public int compareTo(LongPair o) {
+            if (first != o.first) {
+                return Long.compare(first, o.first);
+            } else {
+                return Long.compare(second, o.second);
+            }
+        }
+    }
+}