You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/11/29 23:18:01 UTC

[2/3] bookkeeper git commit: BOOKKEEPER-964: Add concurrent maps and sets for primitive types

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
new file mode 100644
index 0000000..90fc548
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
@@ -0,0 +1,493 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Concurrent hash map
+ *
+ * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
+ * allocations are required to store the values
+ *
+ * @param <V>
+ */
+@SuppressWarnings("unchecked")
+public class ConcurrentOpenHashMap<K, V> {
+
+    private static final Object EmptyKey = null;
+    private static final Object DeletedKey = new Object();
+
+    private static final float MapFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section<K, V>[] sections;
+
+    public ConcurrentOpenHashMap() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentOpenHashMap(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        this.sections = (Section<K, V>[]) new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section<>(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section<K, V> s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section<K, V> s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section<K, V> s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public V get(K key) {
+        checkNotNull(key);
+        long h = hash(key);
+        return getSection(h).get(key, (int) h);
+    }
+
+    public boolean containsKey(K key) {
+        return get(key) != null;
+    }
+
+    public V put(K key, V value) {
+        checkNotNull(key);
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, false, null);
+    }
+
+    public V putIfAbsent(K key, V value) {
+        checkNotNull(key);
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, true, null);
+    }
+
+    public V computeIfAbsent(K key, Function<K, V> provider) {
+        checkNotNull(key);
+        checkNotNull(provider);
+        long h = hash(key);
+        return getSection(h).put(key, null, (int) h, true, provider);
+    }
+
+    public V remove(K key) {
+        checkNotNull(key);
+        long h = hash(key);
+        return getSection(h).remove(key, null, (int) h);
+    }
+
+    public boolean remove(K key, Object value) {
+        checkNotNull(key);
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).remove(key, value, (int) h) != null;
+    }
+
+    private Section<K, V> getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section<K, V> s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(BiConsumer<? super K, ? super V> processor) {
+        for (Section<K, V> s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    public int removeIf(BiPredicate<K, V> filter) {
+        checkNotNull(filter);
+
+        int removedCount = 0;
+        for (Section<K,V> s : sections) {
+            removedCount += s.removeIf(filter);
+        }
+
+        return removedCount;
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public List<K> keys() {
+        List<K> keys = Lists.newArrayList();
+        forEach((key, value) -> keys.add(key));
+        return keys;
+    }
+
+    public List<V> values() {
+        List<V> values = Lists.newArrayList();
+        forEach((key, value) -> values.add(value));
+        return values;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section<K, V> extends StampedLock {
+        // Keys and values are stored interleaved in the table array
+        private Object[] table;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.table = new Object[2 * this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+        }
+
+        V get(K key, int keyHash) {
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    // First try optimistic locking
+                    K storedKey = (K) table[bucket];
+                    V storedValue = (V) table[bucket + 1];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (key.equals(storedKey)) {
+                            return storedValue;
+                        } else if (storedKey == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            bucket = signSafeMod(keyHash, capacity);
+                            storedKey = (K) table[bucket];
+                            storedValue = (V) table[bucket + 1];
+                        }
+
+                        if (key.equals(storedKey)) {
+                            return storedValue;
+                        } else if (storedKey == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        V put(K key, V value, int keyHash, boolean onlyIfAbsent, Function<K, V> valueProvider) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    K storedKey = (K) table[bucket];
+                    V storedValue = (V) table[bucket + 1];
+
+                    if (key.equals(storedKey)) {
+                        if (!onlyIfAbsent) {
+                            // Over written an old value for same key
+                            table[bucket + 1] = value;
+                            return storedValue;
+                        } else {
+                            return storedValue;
+                        }
+                    } else if (storedKey == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        if (value == null) {
+                            value = valueProvider.apply(key);
+                        }
+
+                        table[bucket] = key;
+                        table[bucket + 1] = value;
+                        ++size;
+                        return valueProvider != null ? value : null;
+                    } else if (storedKey == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private V remove(K key, Object value, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    K storedKey = (K) table[bucket];
+                    V storedValue = (V) table[bucket + 1];
+                    if (key.equals(storedKey)) {
+                        if (value == null || value.equals(storedValue)) {
+                            --size;
+                            cleanBucket(bucket);
+                            return storedValue;
+                        } else {
+                            return null;
+                        }
+                    } else if (storedKey == EmptyKey) {
+                        // Key wasn't found
+                        return null;
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(table, EmptyKey);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(BiConsumer<? super K, ? super V> processor) {
+            long stamp = tryOptimisticRead();
+
+            Object[] table = this.table;
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+                    table = this.table;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    K storedKey = (K) table[bucket];
+                    V storedValue = (V) table[bucket + 1];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedKey = (K) table[bucket];
+                        storedValue = (V) table[bucket + 1];
+                    }
+
+                    if (storedKey != DeletedKey && storedKey != EmptyKey) {
+                        processor.accept(storedKey, storedValue);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        int removeIf(BiPredicate<K, V> filter) {
+            long stamp = writeLock();
+
+            int removedCount = 0;
+            try {
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    K storedKey = (K) table[bucket];
+                    V storedValue = (V) table[bucket + 1];
+
+                    if (storedKey != DeletedKey && storedKey != EmptyKey) {
+                        if (filter.test(storedKey, storedValue)) {
+                            // Removing item
+                            --size;
+                            ++removedCount;
+                            cleanBucket(bucket);
+                        }
+                    }
+                }
+
+                return removedCount;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        private final void cleanBucket(int bucket) {
+            int nextInArray = (bucket + 2) & (table.length - 1);
+            if (table[nextInArray] == EmptyKey) {
+                table[bucket] = EmptyKey;
+                table[bucket + 1] = null;
+                --usedBuckets;
+            } else {
+                table[bucket] = DeletedKey;
+                table[bucket + 1] = null;
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            Object[] newTable = new Object[2 * newCapacity];
+
+            // Re-hash table
+            for (int i = 0; i < table.length; i += 2) {
+                K storedKey = (K) table[i];
+                V storedValue = (V) table[i + 1];
+                if (storedKey != EmptyKey && storedKey != DeletedKey) {
+                    insertKeyValueNoLock(newTable, newCapacity, storedKey, storedValue);
+                }
+            }
+
+            capacity = newCapacity;
+            table = newTable;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * MapFillFactor);
+        }
+
+        private static <K, V> void insertKeyValueNoLock(Object[] table, int capacity, K key, V value) {
+            int bucket = signSafeMod(hash(key), capacity);
+
+            while (true) {
+                K storedKey = (K) table[bucket];
+
+                if (storedKey == EmptyKey) {
+                    // The bucket is empty, so we can use it
+                    table[bucket] = key;
+                    table[bucket + 1] = value;
+                    return;
+                }
+
+                bucket = (bucket + 2) & (table.length - 1);
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    final static <K> long hash(K key) {
+        long hash = key.hashCode() * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) (n & (Max - 1)) << 1;
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
new file mode 100644
index 0000000..99a552d
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
@@ -0,0 +1,416 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Concurrent hash set
+ * 
+ * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
+ * allocations are required to store the values
+ *
+ * @param <V>
+ */
+@SuppressWarnings("unchecked")
+public class ConcurrentOpenHashSet<V> {
+
+    private static final Object EmptyValue = null;
+    private static final Object DeletedValue = new Object();
+
+    private static final float MapFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section<V>[] sections;
+
+    public ConcurrentOpenHashSet() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentOpenHashSet(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        this.sections = (Section<V>[]) new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section<>(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section<V> s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section<V> s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section<V> s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public boolean contains(V value) {
+        checkNotNull(value);
+        long h = hash(value);
+        return getSection(h).contains(value, (int) h);
+    }
+
+    public boolean add(V value) {
+        checkNotNull(value);
+        long h = hash(value);
+        return getSection(h).add(value, (int) h);
+    }
+
+    public boolean remove(V value) {
+        checkNotNull(value);
+        long h = hash(value);
+        return getSection(h).remove(value, (int) h);
+    }
+
+    private Section<V> getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section<V> s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(Consumer<? super V> processor) {
+        for (Section<V> s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all values (makes a copy)
+     */
+    List<V> values() {
+        List<V> values = Lists.newArrayList();
+        forEach(value -> values.add(value));
+        return values;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section<V> extends StampedLock {
+        private V[] values;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.values = (V[]) new Object[this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+        }
+
+        boolean contains(V value, int keyHash) {
+            int bucket = keyHash;
+
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+
+            try {
+                while (true) {
+                    int capacity = this.capacity;
+                    bucket = signSafeMod(bucket, capacity);
+
+                    // First try optimistic locking
+                    V storedValue = values[bucket];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (value.equals(storedValue)) {
+                            return true;
+                        } else if (storedValue == EmptyValue) {
+                            // Not found
+                            return false;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            storedValue = values[bucket];
+                        }
+
+                        if (capacity != this.capacity) {
+                            // There has been a rehashing. We need to restart the search
+                            bucket = keyHash;
+                            continue;
+                        }
+
+                        if (value.equals(storedValue)) {
+                            return true;
+                        } else if (storedValue == EmptyValue) {
+                            // Not found
+                            return false;
+                        }
+                    }
+
+                    ++bucket;
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        boolean add(V value, int keyHash) {
+            int bucket = keyHash;
+
+            long stamp = writeLock();
+            int capacity = this.capacity;
+
+            // Remember where we find the first available spot
+            int firstDeletedValue = -1;
+
+            try {
+                while (true) {
+                    bucket = signSafeMod(bucket, capacity);
+
+                    V storedValue = values[bucket];
+
+                    if (value.equals(storedValue)) {
+                        return false;
+                    } else if (storedValue == EmptyValue) {
+                        // Found an empty bucket. This means the value is not in the set. If we've already seen a
+                        // deleted value, we should write at that position
+                        if (firstDeletedValue != -1) {
+                            bucket = firstDeletedValue;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        values[bucket] = value;
+                        ++size;
+                        return true;
+                    } else if (storedValue == DeletedValue) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedValue == -1) {
+                            firstDeletedValue = bucket;
+                        }
+                    }
+
+                    ++bucket;
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private boolean remove(V value, int keyHash) {
+            int bucket = keyHash;
+            long stamp = writeLock();
+
+            try {
+                while (true) {
+                    int capacity = this.capacity;
+                    bucket = signSafeMod(bucket, capacity);
+
+                    V storedValue = values[bucket];
+                    if (value.equals(storedValue)) {
+                        --size;
+
+                        int nextInArray = signSafeMod(bucket + 1, capacity);
+                        if (values[nextInArray] == EmptyValue) {
+                            values[bucket] = (V) EmptyValue;
+                            --usedBuckets;
+                        } else {
+                            values[bucket] = (V) DeletedValue;
+                        }
+
+                        return true;
+                    } else if (storedValue == EmptyValue) {
+                        // Value wasn't found
+                        return false;
+                    }
+
+                    ++bucket;
+                }
+
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(values, EmptyValue);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(Consumer<? super V> processor) {
+            long stamp = tryOptimisticRead();
+
+            int capacity = this.capacity;
+            V[] values = this.values;
+
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+
+                    capacity = this.capacity;
+                    values = this.values;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < capacity; bucket++) {
+                    V storedValue = values[bucket];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedValue = values[bucket];
+                    }
+
+                    if (storedValue != DeletedValue && storedValue != EmptyValue) {
+                        processor.accept(storedValue);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            V[] newValues = (V[]) new Object[newCapacity];
+
+            // Re-hash table
+            for (int i = 0; i < values.length; i++) {
+                V storedValue = values[i];
+                if (storedValue != EmptyValue && storedValue != DeletedValue) {
+                    insertValueNoLock(newValues, storedValue);
+                }
+            }
+
+            capacity = newCapacity;
+            values = newValues;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * MapFillFactor);
+        }
+
+        private static <V> void insertValueNoLock(V[] values, V value) {
+            int bucket = (int) hash(value);
+
+            while (true) {
+                bucket = signSafeMod(bucket, values.length);
+
+                V storedValue = values[bucket];
+
+                if (storedValue == EmptyValue) {
+                    // The bucket is empty, so we can use it
+                    values[bucket] = value;
+                    return;
+                }
+
+                ++bucket;
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    final static <K> long hash(K key) {
+        long hash = key.hashCode() * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) n & (Max - 1);
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
new file mode 100644
index 0000000..44f42b8
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -0,0 +1,435 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongFunction;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class ConcurrentLongHashMapTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+            new ConcurrentLongHashMap<String>(0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongHashMap<String>(16, 0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongHashMap<String>(4, 8);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
+
+        assertTrue(map.isEmpty());
+        assertNull(map.put(1, "one"));
+        assertFalse(map.isEmpty());
+
+        assertNull(map.put(2, "two"));
+        assertNull(map.put(3, "three"));
+
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.get(1), "one");
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.remove(1), "one");
+        assertEquals(map.size(), 2);
+        assertEquals(map.get(1), null);
+        assertEquals(map.get(5), null);
+        assertEquals(map.size(), 2);
+
+        assertNull(map.put(1, "one"));
+        assertEquals(map.size(), 3);
+        assertEquals(map.put(1, "uno"), "one");
+        assertEquals(map.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+
+        assertTrue(map.isEmpty());
+        assertNull(map.put(1, "one"));
+        assertFalse(map.isEmpty());
+
+        assertFalse(map.remove(0, "zero"));
+        assertFalse(map.remove(1, "uno"));
+
+        assertFalse(map.isEmpty());
+        assertTrue(map.remove(1, "one"));
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testNegativeUsedBucketCount() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+
+        map.put(0, "zero");
+        assertEquals(1, map.getUsedBucketCount());
+        map.put(0, "zero1");
+        assertEquals(1, map.getUsedBucketCount());
+        map.remove(0);
+        assertEquals(0, map.getUsedBucketCount());
+        map.remove(0);
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            map.put(i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            map.put(i, i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            map.remove(i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            map.put(i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        String value = "value";
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.put(key, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        String value = "value";
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.put(key, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, "zero");
+
+        assertEquals(map.keys(), Lists.newArrayList(0l));
+        assertEquals(map.values(), Lists.newArrayList("zero"));
+
+        map.remove(0);
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, "zero");
+        map.put(1, "one");
+        map.put(2, "two");
+
+        List<Long> keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+        List<String> values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList("one", "two", "zero"));
+
+        map.put(1, "uno");
+
+        keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+        values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList("two", "uno", "zero"));
+
+        map.clear();
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testHashConflictWithDeletion() {
+        final int Buckets = 16;
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(Buckets, 1);
+
+        // Pick 2 keys that fall into the same bucket
+        long key1 = 1;
+        long key2 = 27;
+
+        int bucket1 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key1), Buckets);
+        int bucket2 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key2), Buckets);
+        assertEquals(bucket1, bucket2);
+
+        assertEquals(map.put(key1, "value-1"), null);
+        assertEquals(map.put(key2, "value-2"), null);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.remove(key1), "value-1");
+        assertEquals(map.size(), 1);
+
+        assertEquals(map.put(key1, "value-1-overwrite"), null);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.remove(key1), "value-1-overwrite");
+        assertEquals(map.size(), 1);
+
+        assertEquals(map.put(key2, "value-2-overwrite"), "value-2");
+        assertEquals(map.get(key2), "value-2-overwrite");
+
+        assertEquals(map.size(), 1);
+        assertEquals(map.remove(key2), "value-2-overwrite");
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testPutIfAbsent() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        assertEquals(map.putIfAbsent(1, "one"), null);
+        assertEquals(map.get(1), "one");
+
+        assertEquals(map.putIfAbsent(1, "uno"), "one");
+        assertEquals(map.get(1), "one");
+    }
+
+    @Test
+    public void testComputeIfAbsent() {
+        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1);
+        AtomicInteger counter = new AtomicInteger();
+        LongFunction<Integer> provider = new LongFunction<Integer>() {
+            public Integer apply(long key) {
+                return counter.getAndIncrement();
+            }
+        };
+
+        assertEquals(map.computeIfAbsent(0, provider).intValue(), 0);
+        assertEquals(map.get(0).intValue(), 0);
+
+        assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
+        assertEquals(map.get(1).intValue(), 1);
+
+        assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
+        assertEquals(map.get(1).intValue(), 1);
+
+        assertEquals(map.computeIfAbsent(2, provider).intValue(), 2);
+        assertEquals(map.get(2).intValue(), 2);
+    }
+
+    final static int Iterations = 1;
+    final static int ReadIterations = 100;
+    final static int N = 1_000_000;
+
+    public void benchConcurrentLongHashMap() throws Exception {
+        // public static void main(String args[]) {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
+
+        for (long i = 0; i < Iterations; i++) {
+            for (int j = 0; j < N; j++) {
+                map.put(i, "value");
+            }
+
+            for (long h = 0; h < ReadIterations; h++) {
+                for (int j = 0; j < N; j++) {
+                    map.get(i);
+                }
+            }
+
+            for (int j = 0; j < N; j++) {
+                map.remove(i);
+            }
+        }
+    }
+
+    public void benchConcurrentHashMap() throws Exception {
+        ConcurrentHashMap<Long, String> map = new ConcurrentHashMap<Long, String>(N, 0.66f, 1);
+
+        for (long i = 0; i < Iterations; i++) {
+            for (int j = 0; j < N; j++) {
+                map.put(i, "value");
+            }
+
+            for (long h = 0; h < ReadIterations; h++) {
+                for (int j = 0; j < N; j++) {
+                    map.get(i);
+                }
+            }
+
+            for (int j = 0; j < N; j++) {
+                map.remove(i);
+            }
+        }
+    }
+
+    void benchHashMap() throws Exception {
+        HashMap<Long, String> map = new HashMap<Long, String>(N, 0.66f);
+
+        for (long i = 0; i < Iterations; i++) {
+            for (int j = 0; j < N; j++) {
+                map.put(i, "value");
+            }
+
+            for (long h = 0; h < ReadIterations; h++) {
+                for (int j = 0; j < N; j++) {
+                    map.get(i);
+                }
+            }
+
+            for (int j = 0; j < N; j++) {
+                map.remove(i);
+            }
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        ConcurrentLongHashMapTest t = new ConcurrentLongHashMapTest();
+
+        long start = System.nanoTime();
+        // t.benchHashMap();
+        long end = System.nanoTime();
+
+        System.out.println("HM:   " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+        start = System.nanoTime();
+        t.benchConcurrentHashMap();
+        end = System.nanoTime();
+
+        System.out.println("CHM:  " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+        start = System.nanoTime();
+        // t.benchConcurrentLongHashMap();
+        end = System.nanoTime();
+
+        System.out.println("CLHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
new file mode 100644
index 0000000..5a8d904
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
@@ -0,0 +1,275 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ConcurrentLongHashSetTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+            new ConcurrentLongHashSet(0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongHashSet(16, 0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongHashSet(4, 8);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet(16);
+
+        assertTrue(set.isEmpty());
+        assertTrue(set.add(1));
+        assertFalse(set.isEmpty());
+
+        assertTrue(set.add(2));
+        assertTrue(set.add(3));
+
+        assertEquals(set.size(), 3);
+
+        assertTrue(set.contains(1));
+        assertEquals(set.size(), 3);
+
+        assertTrue(set.remove(1));
+        assertEquals(set.size(), 2);
+        assertFalse(set.contains(1));
+        assertFalse(set.contains(5));
+        assertEquals(set.size(), 2);
+
+        assertTrue(set.add(1));
+        assertEquals(set.size(), 3);
+        assertFalse(set.add(1));
+        assertEquals(set.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+
+        assertTrue(set.isEmpty());
+        assertTrue(set.add(1));
+        assertFalse(set.isEmpty());
+
+        assertFalse(set.remove(0));
+        assertFalse(set.isEmpty());
+        assertTrue(set.remove(1));
+        assertTrue(set.isEmpty());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+        assertEquals(set.capacity(), n);
+        assertEquals(set.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            set.add(i);
+        }
+
+        assertEquals(set.capacity(), 2 * n);
+        assertEquals(set.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+        assertEquals(set.capacity(), n);
+        assertEquals(set.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            set.add(i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            set.remove(i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            set.add(i);
+        }
+
+        assertEquals(set.capacity(), 2 * n);
+        assertEquals(set.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = Math.abs(random.nextLong());
+                    // Ensure keys are unique
+                    key -= key % (threadIdx + 1);
+
+                    set.add(key);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(set.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongHashSet map = new ConcurrentLongHashSet();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = Math.abs(random.nextLong());
+                    // Ensure keys are unique
+                    key -= key % (threadIdx + 1);
+
+                    map.add(key);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+
+        assertEquals(set.items(), Collections.emptySet());
+
+        set.add(0l);
+
+        assertEquals(set.items(), Sets.newHashSet(0l));
+
+        set.remove(0l);
+
+        assertEquals(set.items(), Collections.emptySet());
+
+        set.add(0l);
+        set.add(1l);
+        set.add(2l);
+
+        List<Long> values = Lists.newArrayList(set.items());
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(0l, 1l, 2l));
+
+        set.clear();
+        assertTrue(set.isEmpty());
+    }
+
+    @Test
+    public void testHashConflictWithDeletion() {
+        final int Buckets = 16;
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet(Buckets, 1);
+
+        // Pick 2 keys that fall into the same bucket
+        long key1 = 1;
+        long key2 = 27;
+
+        int bucket1 = ConcurrentOpenHashSet.signSafeMod(ConcurrentOpenHashSet.hash(key1), Buckets);
+        int bucket2 = ConcurrentOpenHashSet.signSafeMod(ConcurrentOpenHashSet.hash(key2), Buckets);
+        assertEquals(bucket1, bucket2);
+
+        assertTrue(set.add(key1));
+        assertTrue(set.add(key2));
+        assertEquals(set.size(), 2);
+
+        assertTrue(set.remove(key1));
+        assertEquals(set.size(), 1);
+
+        assertTrue(set.add(key1));
+        assertEquals(set.size(), 2);
+
+        assertTrue(set.remove(key1));
+        assertEquals(set.size(), 1);
+
+        assertFalse(set.add(key2));
+        assertTrue(set.contains(key2));
+
+        assertEquals(set.size(), 1);
+        assertTrue(set.remove(key2));
+        assertTrue(set.isEmpty());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
new file mode 100644
index 0000000..a7492a1
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
@@ -0,0 +1,473 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.LongLongFunction;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class ConcurrentLongLongHashMapTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+            new ConcurrentLongLongHashMap(0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongLongHashMap(16, 0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongLongHashMap(4, 8);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16);
+
+        assertTrue(map.isEmpty());
+        assertEquals(map.put(1, 11), -1);
+        assertFalse(map.isEmpty());
+
+        assertEquals(map.put(2, 22), -1);
+        assertEquals(map.put(3, 33), -1);
+
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.get(1), 11);
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.remove(1), 11);
+        assertEquals(map.size(), 2);
+        assertEquals(map.get(1), -1);
+        assertEquals(map.get(5), -1);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.put(1, 11), -1);
+        assertEquals(map.size(), 3);
+        assertEquals(map.put(1, 111), 11);
+        assertEquals(map.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+
+        assertTrue(map.isEmpty());
+        assertEquals(map.put(1, 11), -1);
+        assertFalse(map.isEmpty());
+
+        assertFalse(map.remove(0, 0));
+        assertFalse(map.remove(1, 111));
+
+        assertFalse(map.isEmpty());
+        assertTrue(map.remove(1, 11));
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testNegativeUsedBucketCount() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+        map.put(0, 0);
+        assertEquals(1, map.getUsedBucketCount());
+        map.put(0, 1);
+        assertEquals(1, map.getUsedBucketCount());
+        map.remove(0);
+        assertEquals(0, map.getUsedBucketCount());
+        map.remove(0);
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            map.put(i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            map.put(i, i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            map.remove(i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            map.put(i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.put(key, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        final long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.put(key, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0);
+
+        assertEquals(map.keys(), Lists.newArrayList(0l));
+        assertEquals(map.values(), Lists.newArrayList(0l));
+
+        map.remove(0);
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0);
+        map.put(1, 11);
+        map.put(2, 22);
+
+        List<Long> keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+        List<Long> values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(0l, 11l, 22l));
+
+        map.put(1, 111);
+
+        keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+        values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(0l, 22l, 111l));
+
+        map.clear();
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testHashConflictWithDeletion() {
+        final int Buckets = 16;
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(Buckets, 1);
+
+        // Pick 2 keys that fall into the same bucket
+        long key1 = 1;
+        long key2 = 27;
+
+        int bucket1 = ConcurrentLongLongHashMap.signSafeMod(ConcurrentLongLongHashMap.hash(key1), Buckets);
+        int bucket2 = ConcurrentLongLongHashMap.signSafeMod(ConcurrentLongLongHashMap.hash(key2), Buckets);
+        assertEquals(bucket1, bucket2);
+
+        final long value1 = 1;
+        final long value2 = 2;
+        final long value1Overwrite = 3;
+        final long value2Overwrite = 3;
+
+        assertEquals(map.put(key1, value1), -1);
+        assertEquals(map.put(key2, value2), -1);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.remove(key1), value1);
+        assertEquals(map.size(), 1);
+
+        assertEquals(map.put(key1, value1Overwrite), -1);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.remove(key1), value1Overwrite);
+        assertEquals(map.size(), 1);
+
+        assertEquals(map.put(key2, value2Overwrite), value2);
+        assertEquals(map.get(key2), value2Overwrite);
+
+        assertEquals(map.size(), 1);
+        assertEquals(map.remove(key2), value2Overwrite);
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testPutIfAbsent() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        assertEquals(map.putIfAbsent(1, 11), -1);
+        assertEquals(map.get(1), 11);
+
+        assertEquals(map.putIfAbsent(1, 111), 11);
+        assertEquals(map.get(1), 11);
+    }
+
+    @Test
+    public void testComputeIfAbsent() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+        AtomicLong counter = new AtomicLong();
+        LongLongFunction provider = new LongLongFunction() {
+            public long apply(long key) {
+                return counter.getAndIncrement();
+            }
+        };
+
+        assertEquals(map.computeIfAbsent(0, provider), 0);
+        assertEquals(map.get(0), 0);
+
+        assertEquals(map.computeIfAbsent(1, provider), 1);
+        assertEquals(map.get(1), 1);
+
+        assertEquals(map.computeIfAbsent(1, provider), 1);
+        assertEquals(map.get(1), 1);
+
+        assertEquals(map.computeIfAbsent(2, provider), 2);
+        assertEquals(map.get(2), 2);
+    }
+
+    @Test
+    public void testAddAndGet() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+        assertEquals(map.addAndGet(0, 0), 0);
+        assertEquals(map.containsKey(0), true);
+        assertEquals(map.get(0), 0);
+
+        assertEquals(map.containsKey(5), false);
+
+        assertEquals(map.addAndGet(0, 5), 5);
+        assertEquals(map.get(0), 5);
+
+        assertEquals(map.addAndGet(0, 1), 6);
+        assertEquals(map.get(0), 6);
+
+        assertEquals(map.addAndGet(0, -2), 4);
+        assertEquals(map.get(0), 4);
+
+        // Cannot bring to value to negative
+        try {
+            map.addAndGet(0, -5);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+        assertEquals(map.get(0), 4);
+    }
+
+    @Test
+    public void testRemoveIf() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+        map.put(1L, 1L);
+        map.put(2L, 2L);
+        map.put(3L, 3L);
+        map.put(4L, 4L);
+
+        map.removeIf(key -> key < 3);
+        assertFalse(map.containsKey(1L));
+        assertFalse(map.containsKey(2L));
+        assertTrue(map.containsKey(3L));
+        assertTrue(map.containsKey(4L));
+        assertEquals(2, map.size());
+    }
+
+    @Test
+    public void testRemoveIfValue() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+        map.put(1L, 1L);
+        map.put(2L, 2L);
+        map.put(3L, 1L);
+        map.put(4L, 2L);
+
+        map.removeIf((key, value) -> value < 2);
+        assertFalse(map.containsKey(1L));
+        assertTrue(map.containsKey(2L));
+        assertFalse(map.containsKey(3L));
+        assertTrue(map.containsKey(4L));
+        assertEquals(2, map.size());
+    }
+
+    @Test
+    public void testIvalidKeys() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+        try {
+            map.put(-5, 4);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.get(-1);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.containsKey(-1);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.putIfAbsent(-1, 1);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.computeIfAbsent(-1, new LongLongFunction() {
+                @Override
+                public long apply(long key) {
+                    return 1;
+                }
+            });
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void testAsMap() {
+        ConcurrentLongLongHashMap lmap = new ConcurrentLongLongHashMap(16, 1);
+        lmap.put(1, 11);
+        lmap.put(2, 22);
+        lmap.put(3, 33);
+
+        Map<Long, Long> map = Maps.newTreeMap();
+        map.put(1l, 11l);
+        map.put(2l, 22l);
+        map.put(3l, 33l);
+
+        assertEquals(map, lmap.asMap());
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
new file mode 100644
index 0000000..23aa327
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class ConcurrentLongLongPairHashMapTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+            new ConcurrentLongLongPairHashMap(0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongLongPairHashMap(16, 0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongLongPairHashMap(4, 8);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(16);
+
+        assertTrue(map.isEmpty());
+        assertTrue(map.put(1, 1, 11, 11));
+        assertFalse(map.isEmpty());
+
+        assertTrue(map.put(2, 2, 22, 22));
+        assertTrue(map.put(3, 3, 33, 33));
+
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+        assertEquals(map.size(), 3);
+
+        assertTrue(map.remove(1, 1));
+        assertEquals(map.size(), 2);
+        assertEquals(map.get(1, 1), null);
+        assertEquals(map.get(5, 5), null);
+        assertEquals(map.size(), 2);
+
+        assertTrue(map.put(1, 1, 11, 11));
+        assertEquals(map.size(), 3);
+        assertTrue(map.put(1, 1, 111, 111));
+        assertEquals(map.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+
+        assertTrue(map.isEmpty());
+        assertTrue(map.put(1, 1, 11, 11));
+        assertFalse(map.isEmpty());
+
+        assertFalse(map.remove(0, 0));
+        assertFalse(map.remove(1, 1, 111, 111));
+
+        assertFalse(map.isEmpty());
+        assertTrue(map.remove(1, 1, 11, 11));
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testNegativeUsedBucketCount() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(16, 1);
+
+        map.put(0, 0, 0, 0);
+        assertEquals(1, map.getUsedBucketCount());
+        map.put(0, 0, 1, 1);
+        assertEquals(1, map.getUsedBucketCount());
+        map.remove(0, 0);
+        assertEquals(0, map.getUsedBucketCount());
+        map.remove(0, 0);
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            map.put(i, i, i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            map.put(i, i, i, i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            map.remove(i, i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            map.put(i, i, i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key1 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key1 -= key1 % (threadIdx + 1);
+
+                    long key2 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key2 -= key2 % (threadIdx + 1);
+
+                    map.put(key1, key2, value, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        final long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key1 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key1 -= key1 % (threadIdx + 1);
+
+                    long key2 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key2 -= key2 % (threadIdx + 1);
+
+                    map.put(key1, key2, value, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0, 0, 0);
+
+        assertEquals(map.keys(), Lists.newArrayList(new LongPair(0, 0)));
+        assertEquals(map.values(), Lists.newArrayList(new LongPair(0, 0)));
+
+        map.remove(0, 0);
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0, 0, 0);
+        map.put(1, 1, 11, 11);
+        map.put(2, 2, 22, 22);
+
+        List<LongPair> keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(new LongPair(0, 0), new LongPair(1, 1), new LongPair(2, 2)));
+
+        List<LongPair> values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new LongPair(11, 11), new LongPair(22, 22)));
+
+        map.put(1, 1, 111, 111);
+
+        keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(new LongPair(0, 0), new LongPair(1, 1), new LongPair(2, 2)));
+
+        values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new LongPair(22, 22), new LongPair(111, 111)));
+
+        map.clear();
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testPutIfAbsent() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+        assertTrue(map.putIfAbsent(1, 1, 11, 11));
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+
+        assertFalse(map.putIfAbsent(1, 1, 111, 111));
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+    }
+
+    @Test
+    public void testIvalidKeys() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(16, 1);
+
+        try {
+            map.put(-5, 3, 4, 4);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.get(-1, 0);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.containsKey(-1, 0);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.putIfAbsent(-1, 1, 1, 1);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void testAsMap() {
+        ConcurrentLongLongPairHashMap lmap = new ConcurrentLongLongPairHashMap(16, 1);
+        lmap.put(1, 1, 11, 11);
+        lmap.put(2, 2, 22, 22);
+        lmap.put(3, 3, 33, 33);
+
+        Map<LongPair, LongPair> map = Maps.newTreeMap();
+        map.put(new LongPair(1, 1), new LongPair(11, 11));
+        map.put(new LongPair(2, 2), new LongPair(22, 22));
+        map.put(new LongPair(3, 3), new LongPair(33, 33));
+
+        assertEquals(map, lmap.asMap());
+    }
+}