You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/11/29 23:18:00 UTC

[1/3] bookkeeper git commit: BOOKKEEPER-964: Add concurrent maps and sets for primitive types

Repository: bookkeeper
Updated Branches:
  refs/heads/master 4cf097871 -> ecbb053e6


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
new file mode 100644
index 0000000..d1dd0be
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
@@ -0,0 +1,488 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class ConcurrentOpenHashMapTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+            new ConcurrentOpenHashMap<String, String>(0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentOpenHashMap<String, String>(16, 0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentOpenHashMap<String, String>(4, 8);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16);
+
+        assertTrue(map.isEmpty());
+        assertNull(map.put("1", "one"));
+        assertFalse(map.isEmpty());
+
+        assertNull(map.put("2", "two"));
+        assertNull(map.put("3", "three"));
+
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.get("1"), "one");
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.remove("1"), "one");
+        assertEquals(map.size(), 2);
+        assertEquals(map.get("1"), null);
+        assertEquals(map.get("5"), null);
+        assertEquals(map.size(), 2);
+
+        assertNull(map.put("1", "one"));
+        assertEquals(map.size(), 3);
+        assertEquals(map.put("1", "uno"), "one");
+        assertEquals(map.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>();
+
+        assertTrue(map.isEmpty());
+        assertNull(map.put("1", "one"));
+        assertFalse(map.isEmpty());
+
+        assertFalse(map.remove("0", "zero"));
+        assertFalse(map.remove("1", "uno"));
+
+        assertFalse(map.isEmpty());
+        assertTrue(map.remove("1", "one"));
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentOpenHashMap<String, Integer> map = new ConcurrentOpenHashMap<>(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            map.put(Integer.toString(i), i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentOpenHashMap<Integer, Integer> map = new ConcurrentOpenHashMap<>(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            map.put(i, i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            map.remove(i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            map.put(i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        String value = "value";
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.put(key, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        String value = "value";
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.put(key, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0l, "zero");
+
+        assertEquals(map.keys(), Lists.newArrayList(0l));
+        assertEquals(map.values(), Lists.newArrayList("zero"));
+
+        map.remove(0l);
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0l, "zero");
+        map.put(1l, "one");
+        map.put(2l, "two");
+
+        List<Long> keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+        List<String> values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList("one", "two", "zero"));
+
+        map.put(1l, "uno");
+
+        keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+        values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList("two", "uno", "zero"));
+
+        map.clear();
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testHashConflictWithDeletion() {
+        final int Buckets = 16;
+        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(Buckets, 1);
+
+        // Pick 2 keys that fall into the same bucket
+        long key1 = 1;
+        long key2 = 27;
+
+        int bucket1 = ConcurrentOpenHashMap.signSafeMod(ConcurrentOpenHashMap.hash(key1), Buckets);
+        int bucket2 = ConcurrentOpenHashMap.signSafeMod(ConcurrentOpenHashMap.hash(key2), Buckets);
+        assertEquals(bucket1, bucket2);
+
+        assertEquals(map.put(key1, "value-1"), null);
+        assertEquals(map.put(key2, "value-2"), null);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.remove(key1), "value-1");
+        assertEquals(map.size(), 1);
+
+        assertEquals(map.put(key1, "value-1-overwrite"), null);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.remove(key1), "value-1-overwrite");
+        assertEquals(map.size(), 1);
+
+        assertEquals(map.put(key2, "value-2-overwrite"), "value-2");
+        assertEquals(map.get(key2), "value-2-overwrite");
+
+        assertEquals(map.size(), 1);
+        assertEquals(map.remove(key2), "value-2-overwrite");
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testPutIfAbsent() {
+        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>();
+        assertEquals(map.putIfAbsent(1l, "one"), null);
+        assertEquals(map.get(1l), "one");
+
+        assertEquals(map.putIfAbsent(1l, "uno"), "one");
+        assertEquals(map.get(1l), "one");
+    }
+
+    @Test
+    public void testComputeIfAbsent() {
+        ConcurrentOpenHashMap<Integer, Integer> map = new ConcurrentOpenHashMap<>(16, 1);
+        AtomicInteger counter = new AtomicInteger();
+        Function<Integer, Integer> provider = key -> counter.getAndIncrement();
+
+        assertEquals(map.computeIfAbsent(0, provider).intValue(), 0);
+        assertEquals(map.get(0).intValue(), 0);
+
+        assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
+        assertEquals(map.get(1).intValue(), 1);
+
+        assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
+        assertEquals(map.get(1).intValue(), 1);
+
+        assertEquals(map.computeIfAbsent(2, provider).intValue(), 2);
+        assertEquals(map.get(2).intValue(), 2);
+    }
+
+    @Test
+    public void testRemoval() {
+        ConcurrentOpenHashMap<Integer, String> map = new ConcurrentOpenHashMap<>();
+        map.put(0, "0");
+        map.put(1, "1");
+        map.put(3, "3");
+        map.put(6, "6");
+        map.put(7, "7");
+
+        List<Integer> keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(0, 1, 3, 6, 7));
+
+        int numOfItemsDeleted = map.removeIf(new BiPredicate<Integer, String>() {
+            @Override
+            public boolean test(Integer k, String v) {
+                return k < 5;
+            }
+        });
+        assertEquals(numOfItemsDeleted, 3);
+        assertEquals(map.size(), keys.size() - numOfItemsDeleted);
+        keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(6, 7));
+    }
+
+    @Test
+    public void testEqualsKeys() {
+        class T {
+            int value;
+
+            T(int value) {
+                this.value = value;
+            }
+
+            @Override
+            public int hashCode() {
+                return Integer.hashCode(value);
+            }
+
+            @Override
+            public boolean equals(Object obj) {
+                if (obj instanceof T) {
+                    return value == ((T) obj).value;
+                }
+
+                return false;
+            }
+        }
+
+        ConcurrentOpenHashMap<T, String> map = new ConcurrentOpenHashMap<>();
+
+        T t1 = new T(1);
+        T t1_b = new T(1);
+        T t2 = new T(2);
+
+        assertEquals(t1, t1_b);
+        assertFalse(t1.equals(t2));
+        assertFalse(t1_b.equals(t2));
+
+        assertNull(map.put(t1, "t1"));
+        assertEquals(map.get(t1), "t1");
+        assertEquals(map.get(t1_b), "t1");
+        assertNull(map.get(t2));
+
+        assertEquals(map.remove(t1_b), "t1");
+        assertNull(map.get(t1));
+        assertNull(map.get(t1_b));
+    }
+
+    final static int Iterations = 1;
+    final static int ReadIterations = 100;
+    final static int N = 1_000_000;
+
+    public void benchConcurrentOpenHashMap() throws Exception {
+        // public static void main(String args[]) {
+        ConcurrentOpenHashMap<Long, String> map = new ConcurrentOpenHashMap<>(N, 1);
+
+        for (long i = 0; i < Iterations; i++) {
+            for (int j = 0; j < N; j++) {
+                map.put(i, "value");
+            }
+
+            for (long h = 0; h < ReadIterations; h++) {
+                for (int j = 0; j < N; j++) {
+                    map.get(i);
+                }
+            }
+
+            for (long j = 0; j < N; j++) {
+                map.remove(i);
+            }
+        }
+    }
+
+    public void benchConcurrentHashMap() throws Exception {
+        ConcurrentHashMap<Long, String> map = new ConcurrentHashMap<Long, String>(N, 0.66f, 1);
+
+        for (long i = 0; i < Iterations; i++) {
+            for (int j = 0; j < N; j++) {
+                map.put(i, "value");
+            }
+
+            for (long h = 0; h < ReadIterations; h++) {
+                for (int j = 0; j < N; j++) {
+                    map.get(i);
+                }
+            }
+
+            for (int j = 0; j < N; j++) {
+                map.remove(i);
+            }
+        }
+    }
+
+    void benchHashMap() throws Exception {
+        HashMap<Long, String> map = new HashMap<Long, String>(N, 0.66f);
+
+        for (long i = 0; i < Iterations; i++) {
+            for (int j = 0; j < N; j++) {
+                map.put(i, "value");
+            }
+
+            for (long h = 0; h < ReadIterations; h++) {
+                for (int j = 0; j < N; j++) {
+                    map.get(i);
+                }
+            }
+
+            for (int j = 0; j < N; j++) {
+                map.remove(i);
+            }
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        ConcurrentOpenHashMapTest t = new ConcurrentOpenHashMapTest();
+
+        long start = System.nanoTime();
+        // t.benchHashMap();
+        long end = System.nanoTime();
+
+        System.out.println("HM:   " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+        start = System.nanoTime();
+        t.benchConcurrentHashMap();
+        end = System.nanoTime();
+
+        System.out.println("CHM:  " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+        start = System.nanoTime();
+        // t.benchConcurrentOpenHashMap();
+        end = System.nanoTime();
+
+        System.out.println("CLHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
new file mode 100644
index 0000000..ec20718
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
@@ -0,0 +1,318 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class ConcurrentOpenHashSetTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+            new ConcurrentOpenHashSet<String>(0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentOpenHashSet<String>(16, 0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentOpenHashSet<String>(4, 8);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentOpenHashSet<String> set = new ConcurrentOpenHashSet<>(16);
+
+        assertTrue(set.isEmpty());
+        assertTrue(set.add("1"));
+        assertFalse(set.isEmpty());
+
+        assertTrue(set.add("2"));
+        assertTrue(set.add("3"));
+
+        assertEquals(set.size(), 3);
+
+        assertTrue(set.contains("1"));
+        assertEquals(set.size(), 3);
+
+        assertTrue(set.remove("1"));
+        assertEquals(set.size(), 2);
+        assertFalse(set.contains("1"));
+        assertFalse(set.contains("5"));
+        assertEquals(set.size(), 2);
+
+        assertTrue(set.add("1"));
+        assertEquals(set.size(), 3);
+        assertFalse(set.add("1"));
+        assertEquals(set.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentOpenHashSet<String> set = new ConcurrentOpenHashSet<>();
+
+        assertTrue(set.isEmpty());
+        assertTrue(set.add("1"));
+        assertFalse(set.isEmpty());
+
+        assertFalse(set.remove("0"));
+        assertFalse(set.isEmpty());
+        assertTrue(set.remove("1"));
+        assertTrue(set.isEmpty());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentOpenHashSet<Integer> set = new ConcurrentOpenHashSet<>(n / 2, 1);
+        assertEquals(set.capacity(), n);
+        assertEquals(set.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            set.add(i);
+        }
+
+        assertEquals(set.capacity(), 2 * n);
+        assertEquals(set.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentOpenHashSet<Integer> set = new ConcurrentOpenHashSet<>(n / 2, 1);
+        assertEquals(set.capacity(), n);
+        assertEquals(set.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            set.add(i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            set.remove(i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            set.add(i);
+        }
+
+        assertEquals(set.capacity(), 2 * n);
+        assertEquals(set.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are unique
+                    key -= key % (threadIdx + 1);
+
+                    set.add(key);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(set.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentOpenHashSet<Long> map = new ConcurrentOpenHashSet<>();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are unique
+                    key -= key % (threadIdx + 1);
+
+                    map.add(key);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>();
+
+        assertEquals(set.values(), Collections.emptyList());
+
+        set.add(0l);
+
+        assertEquals(set.values(), Lists.newArrayList(0l));
+
+        set.remove(0l);
+
+        assertEquals(set.values(), Collections.emptyList());
+
+        set.add(0l);
+        set.add(1l);
+        set.add(2l);
+
+        List<Long> values = set.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(0l, 1l, 2l));
+
+        set.clear();
+        assertTrue(set.isEmpty());
+    }
+
+    @Test
+    public void testHashConflictWithDeletion() {
+        final int Buckets = 16;
+        ConcurrentOpenHashSet<Long> set = new ConcurrentOpenHashSet<>(Buckets, 1);
+
+        // Pick 2 keys that fall into the same bucket
+        long key1 = 1;
+        long key2 = 27;
+
+        int bucket1 = ConcurrentOpenHashSet.signSafeMod(ConcurrentOpenHashSet.hash(key1), Buckets);
+        int bucket2 = ConcurrentOpenHashSet.signSafeMod(ConcurrentOpenHashSet.hash(key2), Buckets);
+        assertEquals(bucket1, bucket2);
+
+        assertTrue(set.add(key1));
+        assertTrue(set.add(key2));
+        assertEquals(set.size(), 2);
+
+        assertTrue(set.remove(key1));
+        assertEquals(set.size(), 1);
+
+        assertTrue(set.add(key1));
+        assertEquals(set.size(), 2);
+
+        assertTrue(set.remove(key1));
+        assertEquals(set.size(), 1);
+
+        assertFalse(set.add(key2));
+        assertTrue(set.contains(key2));
+
+        assertEquals(set.size(), 1);
+        assertTrue(set.remove(key2));
+        assertTrue(set.isEmpty());
+    }
+
+    @Test
+    public void testEqualsObjects() {
+        class T {
+            int value;
+
+            T(int value) {
+                this.value = value;
+            }
+
+            @Override
+            public int hashCode() {
+                return Integer.hashCode(value);
+            }
+
+            @Override
+            public boolean equals(Object obj) {
+                if (obj instanceof T) {
+                    return value == ((T) obj).value;
+                }
+
+                return false;
+            }
+        }
+
+        ConcurrentOpenHashSet<T> set = new ConcurrentOpenHashSet<>();
+
+        T t1 = new T(1);
+        T t1_b = new T(1);
+        T t2 = new T(2);
+
+        assertEquals(t1, t1_b);
+        assertFalse(t1.equals(t2));
+        assertFalse(t1_b.equals(t2));
+
+        set.add(t1);
+        assertTrue(set.contains(t1));
+        assertTrue(set.contains(t1_b));
+        assertFalse(set.contains(t2));
+
+        assertTrue(set.remove(t1_b));
+        assertFalse(set.contains(t1));
+        assertFalse(set.contains(t1_b));
+    }
+
+}


[2/3] bookkeeper git commit: BOOKKEEPER-964: Add concurrent maps and sets for primitive types

Posted by si...@apache.org.
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
new file mode 100644
index 0000000..90fc548
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
@@ -0,0 +1,493 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Concurrent hash map
+ *
+ * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
+ * allocations are required to store the values
+ *
+ * @param <V>
+ */
+@SuppressWarnings("unchecked")
+public class ConcurrentOpenHashMap<K, V> {
+
+    private static final Object EmptyKey = null;
+    private static final Object DeletedKey = new Object();
+
+    private static final float MapFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section<K, V>[] sections;
+
+    public ConcurrentOpenHashMap() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentOpenHashMap(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        this.sections = (Section<K, V>[]) new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section<>(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section<K, V> s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section<K, V> s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section<K, V> s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public V get(K key) {
+        checkNotNull(key);
+        long h = hash(key);
+        return getSection(h).get(key, (int) h);
+    }
+
+    public boolean containsKey(K key) {
+        return get(key) != null;
+    }
+
+    public V put(K key, V value) {
+        checkNotNull(key);
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, false, null);
+    }
+
+    public V putIfAbsent(K key, V value) {
+        checkNotNull(key);
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, true, null);
+    }
+
+    public V computeIfAbsent(K key, Function<K, V> provider) {
+        checkNotNull(key);
+        checkNotNull(provider);
+        long h = hash(key);
+        return getSection(h).put(key, null, (int) h, true, provider);
+    }
+
+    public V remove(K key) {
+        checkNotNull(key);
+        long h = hash(key);
+        return getSection(h).remove(key, null, (int) h);
+    }
+
+    public boolean remove(K key, Object value) {
+        checkNotNull(key);
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).remove(key, value, (int) h) != null;
+    }
+
+    private Section<K, V> getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section<K, V> s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(BiConsumer<? super K, ? super V> processor) {
+        for (Section<K, V> s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    public int removeIf(BiPredicate<K, V> filter) {
+        checkNotNull(filter);
+
+        int removedCount = 0;
+        for (Section<K,V> s : sections) {
+            removedCount += s.removeIf(filter);
+        }
+
+        return removedCount;
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public List<K> keys() {
+        List<K> keys = Lists.newArrayList();
+        forEach((key, value) -> keys.add(key));
+        return keys;
+    }
+
+    public List<V> values() {
+        List<V> values = Lists.newArrayList();
+        forEach((key, value) -> values.add(value));
+        return values;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section<K, V> extends StampedLock {
+        // Keys and values are stored interleaved in the table array
+        private Object[] table;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.table = new Object[2 * this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+        }
+
+        V get(K key, int keyHash) {
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    // First try optimistic locking
+                    K storedKey = (K) table[bucket];
+                    V storedValue = (V) table[bucket + 1];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (key.equals(storedKey)) {
+                            return storedValue;
+                        } else if (storedKey == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            bucket = signSafeMod(keyHash, capacity);
+                            storedKey = (K) table[bucket];
+                            storedValue = (V) table[bucket + 1];
+                        }
+
+                        if (key.equals(storedKey)) {
+                            return storedValue;
+                        } else if (storedKey == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        V put(K key, V value, int keyHash, boolean onlyIfAbsent, Function<K, V> valueProvider) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    K storedKey = (K) table[bucket];
+                    V storedValue = (V) table[bucket + 1];
+
+                    if (key.equals(storedKey)) {
+                        if (!onlyIfAbsent) {
+                            // Over written an old value for same key
+                            table[bucket + 1] = value;
+                            return storedValue;
+                        } else {
+                            return storedValue;
+                        }
+                    } else if (storedKey == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        if (value == null) {
+                            value = valueProvider.apply(key);
+                        }
+
+                        table[bucket] = key;
+                        table[bucket + 1] = value;
+                        ++size;
+                        return valueProvider != null ? value : null;
+                    } else if (storedKey == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private V remove(K key, Object value, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    K storedKey = (K) table[bucket];
+                    V storedValue = (V) table[bucket + 1];
+                    if (key.equals(storedKey)) {
+                        if (value == null || value.equals(storedValue)) {
+                            --size;
+                            cleanBucket(bucket);
+                            return storedValue;
+                        } else {
+                            return null;
+                        }
+                    } else if (storedKey == EmptyKey) {
+                        // Key wasn't found
+                        return null;
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(table, EmptyKey);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(BiConsumer<? super K, ? super V> processor) {
+            long stamp = tryOptimisticRead();
+
+            Object[] table = this.table;
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+                    table = this.table;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    K storedKey = (K) table[bucket];
+                    V storedValue = (V) table[bucket + 1];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedKey = (K) table[bucket];
+                        storedValue = (V) table[bucket + 1];
+                    }
+
+                    if (storedKey != DeletedKey && storedKey != EmptyKey) {
+                        processor.accept(storedKey, storedValue);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        int removeIf(BiPredicate<K, V> filter) {
+            long stamp = writeLock();
+
+            int removedCount = 0;
+            try {
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    K storedKey = (K) table[bucket];
+                    V storedValue = (V) table[bucket + 1];
+
+                    if (storedKey != DeletedKey && storedKey != EmptyKey) {
+                        if (filter.test(storedKey, storedValue)) {
+                            // Removing item
+                            --size;
+                            ++removedCount;
+                            cleanBucket(bucket);
+                        }
+                    }
+                }
+
+                return removedCount;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        private final void cleanBucket(int bucket) {
+            int nextInArray = (bucket + 2) & (table.length - 1);
+            if (table[nextInArray] == EmptyKey) {
+                table[bucket] = EmptyKey;
+                table[bucket + 1] = null;
+                --usedBuckets;
+            } else {
+                table[bucket] = DeletedKey;
+                table[bucket + 1] = null;
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            Object[] newTable = new Object[2 * newCapacity];
+
+            // Re-hash table
+            for (int i = 0; i < table.length; i += 2) {
+                K storedKey = (K) table[i];
+                V storedValue = (V) table[i + 1];
+                if (storedKey != EmptyKey && storedKey != DeletedKey) {
+                    insertKeyValueNoLock(newTable, newCapacity, storedKey, storedValue);
+                }
+            }
+
+            capacity = newCapacity;
+            table = newTable;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * MapFillFactor);
+        }
+
+        private static <K, V> void insertKeyValueNoLock(Object[] table, int capacity, K key, V value) {
+            int bucket = signSafeMod(hash(key), capacity);
+
+            while (true) {
+                K storedKey = (K) table[bucket];
+
+                if (storedKey == EmptyKey) {
+                    // The bucket is empty, so we can use it
+                    table[bucket] = key;
+                    table[bucket + 1] = value;
+                    return;
+                }
+
+                bucket = (bucket + 2) & (table.length - 1);
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    final static <K> long hash(K key) {
+        long hash = key.hashCode() * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) (n & (Max - 1)) << 1;
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
new file mode 100644
index 0000000..99a552d
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
@@ -0,0 +1,416 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Concurrent hash set
+ * 
+ * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
+ * allocations are required to store the values
+ *
+ * @param <V>
+ */
+@SuppressWarnings("unchecked")
+public class ConcurrentOpenHashSet<V> {
+
+    private static final Object EmptyValue = null;
+    private static final Object DeletedValue = new Object();
+
+    private static final float MapFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section<V>[] sections;
+
+    public ConcurrentOpenHashSet() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentOpenHashSet(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        this.sections = (Section<V>[]) new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section<>(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section<V> s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section<V> s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section<V> s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public boolean contains(V value) {
+        checkNotNull(value);
+        long h = hash(value);
+        return getSection(h).contains(value, (int) h);
+    }
+
+    public boolean add(V value) {
+        checkNotNull(value);
+        long h = hash(value);
+        return getSection(h).add(value, (int) h);
+    }
+
+    public boolean remove(V value) {
+        checkNotNull(value);
+        long h = hash(value);
+        return getSection(h).remove(value, (int) h);
+    }
+
+    private Section<V> getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section<V> s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(Consumer<? super V> processor) {
+        for (Section<V> s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all values (makes a copy)
+     */
+    List<V> values() {
+        List<V> values = Lists.newArrayList();
+        forEach(value -> values.add(value));
+        return values;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section<V> extends StampedLock {
+        private V[] values;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.values = (V[]) new Object[this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+        }
+
+        boolean contains(V value, int keyHash) {
+            int bucket = keyHash;
+
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+
+            try {
+                while (true) {
+                    int capacity = this.capacity;
+                    bucket = signSafeMod(bucket, capacity);
+
+                    // First try optimistic locking
+                    V storedValue = values[bucket];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (value.equals(storedValue)) {
+                            return true;
+                        } else if (storedValue == EmptyValue) {
+                            // Not found
+                            return false;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            storedValue = values[bucket];
+                        }
+
+                        if (capacity != this.capacity) {
+                            // There has been a rehashing. We need to restart the search
+                            bucket = keyHash;
+                            continue;
+                        }
+
+                        if (value.equals(storedValue)) {
+                            return true;
+                        } else if (storedValue == EmptyValue) {
+                            // Not found
+                            return false;
+                        }
+                    }
+
+                    ++bucket;
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        boolean add(V value, int keyHash) {
+            int bucket = keyHash;
+
+            long stamp = writeLock();
+            int capacity = this.capacity;
+
+            // Remember where we find the first available spot
+            int firstDeletedValue = -1;
+
+            try {
+                while (true) {
+                    bucket = signSafeMod(bucket, capacity);
+
+                    V storedValue = values[bucket];
+
+                    if (value.equals(storedValue)) {
+                        return false;
+                    } else if (storedValue == EmptyValue) {
+                        // Found an empty bucket. This means the value is not in the set. If we've already seen a
+                        // deleted value, we should write at that position
+                        if (firstDeletedValue != -1) {
+                            bucket = firstDeletedValue;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        values[bucket] = value;
+                        ++size;
+                        return true;
+                    } else if (storedValue == DeletedValue) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedValue == -1) {
+                            firstDeletedValue = bucket;
+                        }
+                    }
+
+                    ++bucket;
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private boolean remove(V value, int keyHash) {
+            int bucket = keyHash;
+            long stamp = writeLock();
+
+            try {
+                while (true) {
+                    int capacity = this.capacity;
+                    bucket = signSafeMod(bucket, capacity);
+
+                    V storedValue = values[bucket];
+                    if (value.equals(storedValue)) {
+                        --size;
+
+                        int nextInArray = signSafeMod(bucket + 1, capacity);
+                        if (values[nextInArray] == EmptyValue) {
+                            values[bucket] = (V) EmptyValue;
+                            --usedBuckets;
+                        } else {
+                            values[bucket] = (V) DeletedValue;
+                        }
+
+                        return true;
+                    } else if (storedValue == EmptyValue) {
+                        // Value wasn't found
+                        return false;
+                    }
+
+                    ++bucket;
+                }
+
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(values, EmptyValue);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(Consumer<? super V> processor) {
+            long stamp = tryOptimisticRead();
+
+            int capacity = this.capacity;
+            V[] values = this.values;
+
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+
+                    capacity = this.capacity;
+                    values = this.values;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < capacity; bucket++) {
+                    V storedValue = values[bucket];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedValue = values[bucket];
+                    }
+
+                    if (storedValue != DeletedValue && storedValue != EmptyValue) {
+                        processor.accept(storedValue);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            V[] newValues = (V[]) new Object[newCapacity];
+
+            // Re-hash table
+            for (int i = 0; i < values.length; i++) {
+                V storedValue = values[i];
+                if (storedValue != EmptyValue && storedValue != DeletedValue) {
+                    insertValueNoLock(newValues, storedValue);
+                }
+            }
+
+            capacity = newCapacity;
+            values = newValues;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * MapFillFactor);
+        }
+
+        private static <V> void insertValueNoLock(V[] values, V value) {
+            int bucket = (int) hash(value);
+
+            while (true) {
+                bucket = signSafeMod(bucket, values.length);
+
+                V storedValue = values[bucket];
+
+                if (storedValue == EmptyValue) {
+                    // The bucket is empty, so we can use it
+                    values[bucket] = value;
+                    return;
+                }
+
+                ++bucket;
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    final static <K> long hash(K key) {
+        long hash = key.hashCode() * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) n & (Max - 1);
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
new file mode 100644
index 0000000..44f42b8
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -0,0 +1,435 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongFunction;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class ConcurrentLongHashMapTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+            new ConcurrentLongHashMap<String>(0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongHashMap<String>(16, 0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongHashMap<String>(4, 8);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
+
+        assertTrue(map.isEmpty());
+        assertNull(map.put(1, "one"));
+        assertFalse(map.isEmpty());
+
+        assertNull(map.put(2, "two"));
+        assertNull(map.put(3, "three"));
+
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.get(1), "one");
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.remove(1), "one");
+        assertEquals(map.size(), 2);
+        assertEquals(map.get(1), null);
+        assertEquals(map.get(5), null);
+        assertEquals(map.size(), 2);
+
+        assertNull(map.put(1, "one"));
+        assertEquals(map.size(), 3);
+        assertEquals(map.put(1, "uno"), "one");
+        assertEquals(map.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+
+        assertTrue(map.isEmpty());
+        assertNull(map.put(1, "one"));
+        assertFalse(map.isEmpty());
+
+        assertFalse(map.remove(0, "zero"));
+        assertFalse(map.remove(1, "uno"));
+
+        assertFalse(map.isEmpty());
+        assertTrue(map.remove(1, "one"));
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testNegativeUsedBucketCount() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+
+        map.put(0, "zero");
+        assertEquals(1, map.getUsedBucketCount());
+        map.put(0, "zero1");
+        assertEquals(1, map.getUsedBucketCount());
+        map.remove(0);
+        assertEquals(0, map.getUsedBucketCount());
+        map.remove(0);
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            map.put(i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            map.put(i, i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            map.remove(i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            map.put(i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        String value = "value";
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.put(key, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        String value = "value";
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.put(key, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, "zero");
+
+        assertEquals(map.keys(), Lists.newArrayList(0l));
+        assertEquals(map.values(), Lists.newArrayList("zero"));
+
+        map.remove(0);
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, "zero");
+        map.put(1, "one");
+        map.put(2, "two");
+
+        List<Long> keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+        List<String> values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList("one", "two", "zero"));
+
+        map.put(1, "uno");
+
+        keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+        values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList("two", "uno", "zero"));
+
+        map.clear();
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testHashConflictWithDeletion() {
+        final int Buckets = 16;
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(Buckets, 1);
+
+        // Pick 2 keys that fall into the same bucket
+        long key1 = 1;
+        long key2 = 27;
+
+        int bucket1 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key1), Buckets);
+        int bucket2 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key2), Buckets);
+        assertEquals(bucket1, bucket2);
+
+        assertEquals(map.put(key1, "value-1"), null);
+        assertEquals(map.put(key2, "value-2"), null);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.remove(key1), "value-1");
+        assertEquals(map.size(), 1);
+
+        assertEquals(map.put(key1, "value-1-overwrite"), null);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.remove(key1), "value-1-overwrite");
+        assertEquals(map.size(), 1);
+
+        assertEquals(map.put(key2, "value-2-overwrite"), "value-2");
+        assertEquals(map.get(key2), "value-2-overwrite");
+
+        assertEquals(map.size(), 1);
+        assertEquals(map.remove(key2), "value-2-overwrite");
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testPutIfAbsent() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+        assertEquals(map.putIfAbsent(1, "one"), null);
+        assertEquals(map.get(1), "one");
+
+        assertEquals(map.putIfAbsent(1, "uno"), "one");
+        assertEquals(map.get(1), "one");
+    }
+
+    @Test
+    public void testComputeIfAbsent() {
+        ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1);
+        AtomicInteger counter = new AtomicInteger();
+        LongFunction<Integer> provider = new LongFunction<Integer>() {
+            public Integer apply(long key) {
+                return counter.getAndIncrement();
+            }
+        };
+
+        assertEquals(map.computeIfAbsent(0, provider).intValue(), 0);
+        assertEquals(map.get(0).intValue(), 0);
+
+        assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
+        assertEquals(map.get(1).intValue(), 1);
+
+        assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
+        assertEquals(map.get(1).intValue(), 1);
+
+        assertEquals(map.computeIfAbsent(2, provider).intValue(), 2);
+        assertEquals(map.get(2).intValue(), 2);
+    }
+
+    final static int Iterations = 1;
+    final static int ReadIterations = 100;
+    final static int N = 1_000_000;
+
+    public void benchConcurrentLongHashMap() throws Exception {
+        // public static void main(String args[]) {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
+
+        for (long i = 0; i < Iterations; i++) {
+            for (int j = 0; j < N; j++) {
+                map.put(i, "value");
+            }
+
+            for (long h = 0; h < ReadIterations; h++) {
+                for (int j = 0; j < N; j++) {
+                    map.get(i);
+                }
+            }
+
+            for (int j = 0; j < N; j++) {
+                map.remove(i);
+            }
+        }
+    }
+
+    public void benchConcurrentHashMap() throws Exception {
+        ConcurrentHashMap<Long, String> map = new ConcurrentHashMap<Long, String>(N, 0.66f, 1);
+
+        for (long i = 0; i < Iterations; i++) {
+            for (int j = 0; j < N; j++) {
+                map.put(i, "value");
+            }
+
+            for (long h = 0; h < ReadIterations; h++) {
+                for (int j = 0; j < N; j++) {
+                    map.get(i);
+                }
+            }
+
+            for (int j = 0; j < N; j++) {
+                map.remove(i);
+            }
+        }
+    }
+
+    void benchHashMap() throws Exception {
+        HashMap<Long, String> map = new HashMap<Long, String>(N, 0.66f);
+
+        for (long i = 0; i < Iterations; i++) {
+            for (int j = 0; j < N; j++) {
+                map.put(i, "value");
+            }
+
+            for (long h = 0; h < ReadIterations; h++) {
+                for (int j = 0; j < N; j++) {
+                    map.get(i);
+                }
+            }
+
+            for (int j = 0; j < N; j++) {
+                map.remove(i);
+            }
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        ConcurrentLongHashMapTest t = new ConcurrentLongHashMapTest();
+
+        long start = System.nanoTime();
+        // t.benchHashMap();
+        long end = System.nanoTime();
+
+        System.out.println("HM:   " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+        start = System.nanoTime();
+        t.benchConcurrentHashMap();
+        end = System.nanoTime();
+
+        System.out.println("CHM:  " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+        start = System.nanoTime();
+        // t.benchConcurrentLongHashMap();
+        end = System.nanoTime();
+
+        System.out.println("CLHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
new file mode 100644
index 0000000..5a8d904
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
@@ -0,0 +1,275 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ConcurrentLongHashSetTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+            new ConcurrentLongHashSet(0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongHashSet(16, 0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongHashSet(4, 8);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet(16);
+
+        assertTrue(set.isEmpty());
+        assertTrue(set.add(1));
+        assertFalse(set.isEmpty());
+
+        assertTrue(set.add(2));
+        assertTrue(set.add(3));
+
+        assertEquals(set.size(), 3);
+
+        assertTrue(set.contains(1));
+        assertEquals(set.size(), 3);
+
+        assertTrue(set.remove(1));
+        assertEquals(set.size(), 2);
+        assertFalse(set.contains(1));
+        assertFalse(set.contains(5));
+        assertEquals(set.size(), 2);
+
+        assertTrue(set.add(1));
+        assertEquals(set.size(), 3);
+        assertFalse(set.add(1));
+        assertEquals(set.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+
+        assertTrue(set.isEmpty());
+        assertTrue(set.add(1));
+        assertFalse(set.isEmpty());
+
+        assertFalse(set.remove(0));
+        assertFalse(set.isEmpty());
+        assertTrue(set.remove(1));
+        assertTrue(set.isEmpty());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+        assertEquals(set.capacity(), n);
+        assertEquals(set.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            set.add(i);
+        }
+
+        assertEquals(set.capacity(), 2 * n);
+        assertEquals(set.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+        assertEquals(set.capacity(), n);
+        assertEquals(set.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            set.add(i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            set.remove(i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            set.add(i);
+        }
+
+        assertEquals(set.capacity(), 2 * n);
+        assertEquals(set.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = Math.abs(random.nextLong());
+                    // Ensure keys are unique
+                    key -= key % (threadIdx + 1);
+
+                    set.add(key);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(set.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongHashSet map = new ConcurrentLongHashSet();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = Math.abs(random.nextLong());
+                    // Ensure keys are unique
+                    key -= key % (threadIdx + 1);
+
+                    map.add(key);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+
+        assertEquals(set.items(), Collections.emptySet());
+
+        set.add(0l);
+
+        assertEquals(set.items(), Sets.newHashSet(0l));
+
+        set.remove(0l);
+
+        assertEquals(set.items(), Collections.emptySet());
+
+        set.add(0l);
+        set.add(1l);
+        set.add(2l);
+
+        List<Long> values = Lists.newArrayList(set.items());
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(0l, 1l, 2l));
+
+        set.clear();
+        assertTrue(set.isEmpty());
+    }
+
+    @Test
+    public void testHashConflictWithDeletion() {
+        final int Buckets = 16;
+        ConcurrentLongHashSet set = new ConcurrentLongHashSet(Buckets, 1);
+
+        // Pick 2 keys that fall into the same bucket
+        long key1 = 1;
+        long key2 = 27;
+
+        int bucket1 = ConcurrentOpenHashSet.signSafeMod(ConcurrentOpenHashSet.hash(key1), Buckets);
+        int bucket2 = ConcurrentOpenHashSet.signSafeMod(ConcurrentOpenHashSet.hash(key2), Buckets);
+        assertEquals(bucket1, bucket2);
+
+        assertTrue(set.add(key1));
+        assertTrue(set.add(key2));
+        assertEquals(set.size(), 2);
+
+        assertTrue(set.remove(key1));
+        assertEquals(set.size(), 1);
+
+        assertTrue(set.add(key1));
+        assertEquals(set.size(), 2);
+
+        assertTrue(set.remove(key1));
+        assertEquals(set.size(), 1);
+
+        assertFalse(set.add(key2));
+        assertTrue(set.contains(key2));
+
+        assertEquals(set.size(), 1);
+        assertTrue(set.remove(key2));
+        assertTrue(set.isEmpty());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
new file mode 100644
index 0000000..a7492a1
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
@@ -0,0 +1,473 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.LongLongFunction;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class ConcurrentLongLongHashMapTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+            new ConcurrentLongLongHashMap(0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongLongHashMap(16, 0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongLongHashMap(4, 8);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16);
+
+        assertTrue(map.isEmpty());
+        assertEquals(map.put(1, 11), -1);
+        assertFalse(map.isEmpty());
+
+        assertEquals(map.put(2, 22), -1);
+        assertEquals(map.put(3, 33), -1);
+
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.get(1), 11);
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.remove(1), 11);
+        assertEquals(map.size(), 2);
+        assertEquals(map.get(1), -1);
+        assertEquals(map.get(5), -1);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.put(1, 11), -1);
+        assertEquals(map.size(), 3);
+        assertEquals(map.put(1, 111), 11);
+        assertEquals(map.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+
+        assertTrue(map.isEmpty());
+        assertEquals(map.put(1, 11), -1);
+        assertFalse(map.isEmpty());
+
+        assertFalse(map.remove(0, 0));
+        assertFalse(map.remove(1, 111));
+
+        assertFalse(map.isEmpty());
+        assertTrue(map.remove(1, 11));
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testNegativeUsedBucketCount() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+        map.put(0, 0);
+        assertEquals(1, map.getUsedBucketCount());
+        map.put(0, 1);
+        assertEquals(1, map.getUsedBucketCount());
+        map.remove(0);
+        assertEquals(0, map.getUsedBucketCount());
+        map.remove(0);
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            map.put(i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            map.put(i, i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            map.remove(i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            map.put(i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.put(key, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        final long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.put(key, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0);
+
+        assertEquals(map.keys(), Lists.newArrayList(0l));
+        assertEquals(map.values(), Lists.newArrayList(0l));
+
+        map.remove(0);
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0);
+        map.put(1, 11);
+        map.put(2, 22);
+
+        List<Long> keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+        List<Long> values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(0l, 11l, 22l));
+
+        map.put(1, 111);
+
+        keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+        values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(0l, 22l, 111l));
+
+        map.clear();
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testHashConflictWithDeletion() {
+        final int Buckets = 16;
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(Buckets, 1);
+
+        // Pick 2 keys that fall into the same bucket
+        long key1 = 1;
+        long key2 = 27;
+
+        int bucket1 = ConcurrentLongLongHashMap.signSafeMod(ConcurrentLongLongHashMap.hash(key1), Buckets);
+        int bucket2 = ConcurrentLongLongHashMap.signSafeMod(ConcurrentLongLongHashMap.hash(key2), Buckets);
+        assertEquals(bucket1, bucket2);
+
+        final long value1 = 1;
+        final long value2 = 2;
+        final long value1Overwrite = 3;
+        final long value2Overwrite = 3;
+
+        assertEquals(map.put(key1, value1), -1);
+        assertEquals(map.put(key2, value2), -1);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.remove(key1), value1);
+        assertEquals(map.size(), 1);
+
+        assertEquals(map.put(key1, value1Overwrite), -1);
+        assertEquals(map.size(), 2);
+
+        assertEquals(map.remove(key1), value1Overwrite);
+        assertEquals(map.size(), 1);
+
+        assertEquals(map.put(key2, value2Overwrite), value2);
+        assertEquals(map.get(key2), value2Overwrite);
+
+        assertEquals(map.size(), 1);
+        assertEquals(map.remove(key2), value2Overwrite);
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testPutIfAbsent() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+        assertEquals(map.putIfAbsent(1, 11), -1);
+        assertEquals(map.get(1), 11);
+
+        assertEquals(map.putIfAbsent(1, 111), 11);
+        assertEquals(map.get(1), 11);
+    }
+
+    @Test
+    public void testComputeIfAbsent() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+        AtomicLong counter = new AtomicLong();
+        LongLongFunction provider = new LongLongFunction() {
+            public long apply(long key) {
+                return counter.getAndIncrement();
+            }
+        };
+
+        assertEquals(map.computeIfAbsent(0, provider), 0);
+        assertEquals(map.get(0), 0);
+
+        assertEquals(map.computeIfAbsent(1, provider), 1);
+        assertEquals(map.get(1), 1);
+
+        assertEquals(map.computeIfAbsent(1, provider), 1);
+        assertEquals(map.get(1), 1);
+
+        assertEquals(map.computeIfAbsent(2, provider), 2);
+        assertEquals(map.get(2), 2);
+    }
+
+    @Test
+    public void testAddAndGet() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+        assertEquals(map.addAndGet(0, 0), 0);
+        assertEquals(map.containsKey(0), true);
+        assertEquals(map.get(0), 0);
+
+        assertEquals(map.containsKey(5), false);
+
+        assertEquals(map.addAndGet(0, 5), 5);
+        assertEquals(map.get(0), 5);
+
+        assertEquals(map.addAndGet(0, 1), 6);
+        assertEquals(map.get(0), 6);
+
+        assertEquals(map.addAndGet(0, -2), 4);
+        assertEquals(map.get(0), 4);
+
+        // Cannot bring to value to negative
+        try {
+            map.addAndGet(0, -5);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+        assertEquals(map.get(0), 4);
+    }
+
+    @Test
+    public void testRemoveIf() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+        map.put(1L, 1L);
+        map.put(2L, 2L);
+        map.put(3L, 3L);
+        map.put(4L, 4L);
+
+        map.removeIf(key -> key < 3);
+        assertFalse(map.containsKey(1L));
+        assertFalse(map.containsKey(2L));
+        assertTrue(map.containsKey(3L));
+        assertTrue(map.containsKey(4L));
+        assertEquals(2, map.size());
+    }
+
+    @Test
+    public void testRemoveIfValue() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+        map.put(1L, 1L);
+        map.put(2L, 2L);
+        map.put(3L, 1L);
+        map.put(4L, 2L);
+
+        map.removeIf((key, value) -> value < 2);
+        assertFalse(map.containsKey(1L));
+        assertTrue(map.containsKey(2L));
+        assertFalse(map.containsKey(3L));
+        assertTrue(map.containsKey(4L));
+        assertEquals(2, map.size());
+    }
+
+    @Test
+    public void testIvalidKeys() {
+        ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+        try {
+            map.put(-5, 4);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.get(-1);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.containsKey(-1);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.putIfAbsent(-1, 1);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.computeIfAbsent(-1, new LongLongFunction() {
+                @Override
+                public long apply(long key) {
+                    return 1;
+                }
+            });
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void testAsMap() {
+        ConcurrentLongLongHashMap lmap = new ConcurrentLongLongHashMap(16, 1);
+        lmap.put(1, 11);
+        lmap.put(2, 22);
+        lmap.put(3, 33);
+
+        Map<Long, Long> map = Maps.newTreeMap();
+        map.put(1l, 11l);
+        map.put(2l, 22l);
+        map.put(3l, 33l);
+
+        assertEquals(map, lmap.asMap());
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
new file mode 100644
index 0000000..23aa327
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class ConcurrentLongLongPairHashMapTest {
+
+    @Test
+    public void testConstructor() {
+        try {
+            new ConcurrentLongLongPairHashMap(0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongLongPairHashMap(16, 0);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            new ConcurrentLongLongPairHashMap(4, 8);
+            fail("should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void simpleInsertions() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(16);
+
+        assertTrue(map.isEmpty());
+        assertTrue(map.put(1, 1, 11, 11));
+        assertFalse(map.isEmpty());
+
+        assertTrue(map.put(2, 2, 22, 22));
+        assertTrue(map.put(3, 3, 33, 33));
+
+        assertEquals(map.size(), 3);
+
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+        assertEquals(map.size(), 3);
+
+        assertTrue(map.remove(1, 1));
+        assertEquals(map.size(), 2);
+        assertEquals(map.get(1, 1), null);
+        assertEquals(map.get(5, 5), null);
+        assertEquals(map.size(), 2);
+
+        assertTrue(map.put(1, 1, 11, 11));
+        assertEquals(map.size(), 3);
+        assertTrue(map.put(1, 1, 111, 111));
+        assertEquals(map.size(), 3);
+    }
+
+    @Test
+    public void testRemove() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+
+        assertTrue(map.isEmpty());
+        assertTrue(map.put(1, 1, 11, 11));
+        assertFalse(map.isEmpty());
+
+        assertFalse(map.remove(0, 0));
+        assertFalse(map.remove(1, 1, 111, 111));
+
+        assertFalse(map.isEmpty());
+        assertTrue(map.remove(1, 1, 11, 11));
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testNegativeUsedBucketCount() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(16, 1);
+
+        map.put(0, 0, 0, 0);
+        assertEquals(1, map.getUsedBucketCount());
+        map.put(0, 0, 1, 1);
+        assertEquals(1, map.getUsedBucketCount());
+        map.remove(0, 0);
+        assertEquals(0, map.getUsedBucketCount());
+        map.remove(0, 0);
+        assertEquals(0, map.getUsedBucketCount());
+    }
+
+    @Test
+    public void testRehashing() {
+        int n = 16;
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n; i++) {
+            map.put(i, i, i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void testRehashingWithDeletes() {
+        int n = 16;
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(n / 2, 1);
+        assertEquals(map.capacity(), n);
+        assertEquals(map.size(), 0);
+
+        for (int i = 0; i < n / 2; i++) {
+            map.put(i, i, i, i);
+        }
+
+        for (int i = 0; i < n / 2; i++) {
+            map.remove(i, i);
+        }
+
+        for (int i = n; i < (2 * n); i++) {
+            map.put(i, i, i, i);
+        }
+
+        assertEquals(map.capacity(), 2 * n);
+        assertEquals(map.size(), n);
+    }
+
+    @Test
+    public void concurrentInsertions() throws Throwable {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key1 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key1 -= key1 % (threadIdx + 1);
+
+                    long key2 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key2 -= key2 % (threadIdx + 1);
+
+                    map.put(key1, key2, value, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void concurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int nThreads = 16;
+        final int N = 100_000;
+        final long value = 55;
+
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < nThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random();
+
+                for (int j = 0; j < N; j++) {
+                    long key1 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key1 -= key1 % (threadIdx + 1);
+
+                    long key2 = Math.abs(random.nextLong());
+                    // Ensure keys are uniques
+                    key2 -= key2 % (threadIdx + 1);
+
+                    map.put(key1, key2, value, value);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), N * nThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
+    public void testIteration() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0, 0, 0);
+
+        assertEquals(map.keys(), Lists.newArrayList(new LongPair(0, 0)));
+        assertEquals(map.values(), Lists.newArrayList(new LongPair(0, 0)));
+
+        map.remove(0, 0);
+
+        assertEquals(map.keys(), Collections.emptyList());
+        assertEquals(map.values(), Collections.emptyList());
+
+        map.put(0, 0, 0, 0);
+        map.put(1, 1, 11, 11);
+        map.put(2, 2, 22, 22);
+
+        List<LongPair> keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(new LongPair(0, 0), new LongPair(1, 1), new LongPair(2, 2)));
+
+        List<LongPair> values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new LongPair(11, 11), new LongPair(22, 22)));
+
+        map.put(1, 1, 111, 111);
+
+        keys = map.keys();
+        Collections.sort(keys);
+        assertEquals(keys, Lists.newArrayList(new LongPair(0, 0), new LongPair(1, 1), new LongPair(2, 2)));
+
+        values = map.values();
+        Collections.sort(values);
+        assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new LongPair(22, 22), new LongPair(111, 111)));
+
+        map.clear();
+        assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testPutIfAbsent() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+        assertTrue(map.putIfAbsent(1, 1, 11, 11));
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+
+        assertFalse(map.putIfAbsent(1, 1, 111, 111));
+        assertEquals(map.get(1, 1), new LongPair(11, 11));
+    }
+
+    @Test
+    public void testIvalidKeys() {
+        ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(16, 1);
+
+        try {
+            map.put(-5, 3, 4, 4);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.get(-1, 0);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.containsKey(-1, 0);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+
+        try {
+            map.putIfAbsent(-1, 1, 1, 1);
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void testAsMap() {
+        ConcurrentLongLongPairHashMap lmap = new ConcurrentLongLongPairHashMap(16, 1);
+        lmap.put(1, 1, 11, 11);
+        lmap.put(2, 2, 22, 22);
+        lmap.put(3, 3, 33, 33);
+
+        Map<LongPair, LongPair> map = Maps.newTreeMap();
+        map.put(new LongPair(1, 1), new LongPair(11, 11));
+        map.put(new LongPair(2, 2), new LongPair(22, 22));
+        map.put(new LongPair(3, 3), new LongPair(33, 33));
+
+        assertEquals(map, lmap.asMap());
+    }
+}


[3/3] bookkeeper git commit: BOOKKEEPER-964: Add concurrent maps and sets for primitive types

Posted by si...@apache.org.
BOOKKEEPER-964: Add concurrent maps and sets for primitive types

In BookKeeper there are many instances of maps and sets that use ledger id
and entry ids as keys or values. JDK concurrent collections have the overhead
of boxing all the primitive values into objects (eg: long --> Long) that would
need to be allocated from the heap. In addition to that, JDK map implementations
are closed hash tables and they will require at least one more allocation to hold
the linked-list/tree node.

There are already available libraries that offer primitive collections with
zero-allocation, but none of these support concurrent maps/sets.

We have added a handful of specializations, all based on the same implementation
idea. We have a hash table which is broken down into multiple sections. Each
sections, on its own, is an open hash table with linear probing, protected by
a stamped lock.

All insertions, lookups and iterations on these collections are allocation free.

```
ConcurrentLongHashMap: Map<long, Object>
ConcurrentLongHashSet: Set<long>
ConcurrentLongLongHashMap: Map<long, long>
ConcurrentLongLongPairHashMap: Map< Pair<long, long>, Pair<long, long> >
ConcurrentOpenHashMap: Map<Object, Object>
ConcurrentOpenHashSet: Set<Object>
```

Author: Matteo Merli <mm...@yahoo-inc.com>

Reviewers: Sijie Guo <si...@apache.org>, Enrico Olivelli <En...@diennea.com>

Closes #72 from merlimat/bk-collections


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/ecbb053e
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/ecbb053e
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/ecbb053e

Branch: refs/heads/master
Commit: ecbb053e6e873859507e247cae727f4bc8b9f7fa
Parents: 4cf0978
Author: Matteo Merli <mm...@yahoo-inc.com>
Authored: Tue Nov 29 15:17:46 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Nov 29 15:17:46 2016 -0800

----------------------------------------------------------------------
 .../util/collections/ConcurrentLongHashMap.java | 494 +++++++++++++
 .../util/collections/ConcurrentLongHashSet.java | 421 +++++++++++
 .../collections/ConcurrentLongLongHashMap.java  | 723 +++++++++++++++++++
 .../ConcurrentLongLongPairHashMap.java          | 550 ++++++++++++++
 .../util/collections/ConcurrentOpenHashMap.java | 493 +++++++++++++
 .../util/collections/ConcurrentOpenHashSet.java | 416 +++++++++++
 .../collections/ConcurrentLongHashMapTest.java  | 435 +++++++++++
 .../collections/ConcurrentLongHashSetTest.java  | 275 +++++++
 .../ConcurrentLongLongHashMapTest.java          | 473 ++++++++++++
 .../ConcurrentLongLongPairHashMapTest.java      | 343 +++++++++
 .../collections/ConcurrentOpenHashMapTest.java  | 488 +++++++++++++
 .../collections/ConcurrentOpenHashSetTest.java  | 318 ++++++++
 12 files changed, 5429 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
new file mode 100644
index 0000000..63603cb
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -0,0 +1,494 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.LongFunction;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Map from long to an Object.
+ * 
+ * Provides similar methods as a ConcurrentMap<long,Object> with 2 differences:
+ * <ol>
+ * <li>No boxing/unboxing from long -> Long
+ * <li>Open hash map with linear probing, no node allocations to store the values
+ * </ol>
+ *
+ * @param <V>
+ */
+@SuppressWarnings("unchecked")
+public class ConcurrentLongHashMap<V> {
+
+    private static final Object EmptyValue = null;
+    private static final Object DeletedValue = new Object();
+
+    private static final float MapFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section<V>[] sections;
+
+    public ConcurrentLongHashMap() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentLongHashMap(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentLongHashMap(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        this.sections = (Section<V>[]) new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section<>(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section<V> s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section<V> s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section<V> s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section<V> s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public V get(long key) {
+        long h = hash(key);
+        return getSection(h).get(key, (int) h);
+    }
+
+    public boolean containsKey(long key) {
+        return get(key) != null;
+    }
+
+    public V put(long key, V value) {
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, false, null);
+    }
+
+    public V putIfAbsent(long key, V value) {
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, true, null);
+    }
+
+    public V computeIfAbsent(long key, LongFunction<V> provider) {
+        checkNotNull(provider);
+        long h = hash(key);
+        return getSection(h).put(key, null, (int) h, true, provider);
+    }
+
+    public V remove(long key) {
+        long h = hash(key);
+        return getSection(h).remove(key, null, (int) h);
+    }
+
+    public boolean remove(long key, Object value) {
+        checkNotNull(value);
+        long h = hash(key);
+        return getSection(h).remove(key, value, (int) h) != null;
+    }
+
+    private Section<V> getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section<V> s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(EntryProcessor<V> processor) {
+        for (Section<V> s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public List<Long> keys() {
+        List<Long> keys = Lists.newArrayListWithExpectedSize((int) size());
+        forEach((key, value) -> keys.add(key));
+        return keys;
+    }
+
+    List<V> values() {
+        List<V> values = Lists.newArrayListWithExpectedSize((int) size());
+        forEach((key, value) -> values.add(value));
+        return values;
+    }
+
+    public static interface EntryProcessor<V> {
+        void accept(long key, V value);
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section<V> extends StampedLock {
+        private long[] keys;
+        private V[] values;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.keys = new long[this.capacity];
+            this.values = (V[]) new Object[this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+        }
+
+        V get(long key, int keyHash) {
+            int bucket = keyHash;
+
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+
+            try {
+                while (true) {
+                    int capacity = this.capacity;
+                    bucket = signSafeMod(bucket, capacity);
+
+                    // First try optimistic locking
+                    long storedKey = keys[bucket];
+                    V storedValue = values[bucket];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (storedKey == key) {
+                            return storedValue != DeletedValue ? storedValue : null;
+                        } else if (storedValue == EmptyValue) {
+                            // Not found
+                            return null;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+                            storedKey = keys[bucket];
+                            storedValue = values[bucket];
+                        }
+
+                        if (capacity != this.capacity) {
+                            // There has been a rehashing. We need to restart the search
+                            bucket = keyHash;
+                            continue;
+                        }
+
+                        if (storedKey == key) {
+                            return storedValue != DeletedValue ? storedValue : null;
+                        } else if (storedValue == EmptyValue) {
+                            // Not found
+                            return null;
+                        }
+                    }
+
+                    ++bucket;
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        V put(long key, V value, int keyHash, boolean onlyIfAbsent, LongFunction<V> valueProvider) {
+            int bucket = keyHash;
+
+            long stamp = writeLock();
+            int capacity = this.capacity;
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    bucket = signSafeMod(bucket, capacity);
+
+                    long storedKey = keys[bucket];
+                    V storedValue = values[bucket];
+
+                    if (storedKey == key) {
+                        if (storedValue == EmptyValue) {
+                            values[bucket] = value != null ? value : valueProvider.apply(key);
+                            ++size;
+                            ++usedBuckets;
+                            return valueProvider != null ? values[bucket] : null;
+                        } else if (storedValue == DeletedValue) {
+                            values[bucket] = value != null ? value : valueProvider.apply(key);
+                            ++size;
+                            return valueProvider != null ? values[bucket] : null;
+                        } else if (!onlyIfAbsent) {
+                            // Over written an old value for same key
+                            values[bucket] = value;
+                            return storedValue;
+                        } else {
+                            return storedValue;
+                        }
+                    } else if (storedValue == EmptyValue) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        keys[bucket] = key;
+                        values[bucket] = value != null ? value : valueProvider.apply(key);
+                        ++size;
+                        return valueProvider != null ? values[bucket] : null;
+                    } else if (storedValue == DeletedValue) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    ++bucket;
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private V remove(long key, Object value, int keyHash) {
+            int bucket = keyHash;
+            long stamp = writeLock();
+
+            try {
+                while (true) {
+                    int capacity = this.capacity;
+                    bucket = signSafeMod(bucket, capacity);
+
+                    long storedKey = keys[bucket];
+                    V storedValue = values[bucket];
+                    if (storedKey == key) {
+                        if (value == null || value.equals(storedValue)) {
+                            if (storedValue == EmptyValue || storedValue == DeletedValue) {
+                                return null;
+                            }
+
+                            --size;
+                            V nextValueInArray = values[signSafeMod(bucket + 1, capacity)];
+                            if (nextValueInArray == EmptyValue) {
+                                values[bucket] = (V) EmptyValue;
+                                --usedBuckets;
+                            } else {
+                                values[bucket] = (V) DeletedValue;
+                            }
+
+                            return storedValue;
+                        } else {
+                            return null;
+                        }
+                    } else if (storedValue == EmptyValue) {
+                        // Key wasn't found
+                        return null;
+                    }
+
+                    ++bucket;
+                }
+
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(keys, 0);
+                Arrays.fill(values, EmptyValue);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(EntryProcessor<V> processor) {
+            long stamp = tryOptimisticRead();
+
+            int capacity = this.capacity;
+            long[] keys = this.keys;
+            V[] values = this.values;
+
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+
+                    capacity = this.capacity;
+                    keys = this.keys;
+                    values = this.values;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < capacity; bucket++) {
+                    long storedKey = keys[bucket];
+                    V storedValue = values[bucket];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedKey = keys[bucket];
+                        storedValue = values[bucket];
+                    }
+
+                    if (storedValue != DeletedValue && storedValue != EmptyValue) {
+                        processor.accept(storedKey, storedValue);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            long[] newKeys = new long[newCapacity];
+            V[] newValues = (V[]) new Object[newCapacity];
+
+            // Re-hash table
+            for (int i = 0; i < keys.length; i++) {
+                long storedKey = keys[i];
+                V storedValue = values[i];
+                if (storedValue != EmptyValue && storedValue != DeletedValue) {
+                    insertKeyValueNoLock(newKeys, newValues, storedKey, storedValue);
+                }
+            }
+
+            capacity = newCapacity;
+            keys = newKeys;
+            values = newValues;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * MapFillFactor);
+        }
+
+        private static <V> void insertKeyValueNoLock(long[] keys, V[] values, long key, V value) {
+            int bucket = (int) hash(key);
+
+            while (true) {
+                bucket = signSafeMod(bucket, keys.length);
+
+                V storedValue = values[bucket];
+
+                if (storedValue == EmptyValue) {
+                    // The bucket is empty, so we can use it
+                    keys[bucket] = key;
+                    values[bucket] = value;
+                    return;
+                }
+
+                ++bucket;
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    static final long hash(long key) {
+        long hash = key * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) n & (Max - 1);
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
new file mode 100644
index 0000000..d02b0bc
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
@@ -0,0 +1,421 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.locks.StampedLock;
+
+/**
+ * Concurrent hash set for primitive longs
+ *
+ * Provides similar methods as a ConcurrentSet&lt;Long&gt; but since it's an open hash map with linear probing, no node
+ * allocations are required to store the values.
+ * <p>
+ * Items <strong>MUST</strong> be >= 0.
+ */
+public class ConcurrentLongHashSet {
+
+    private static final long EmptyItem = -1L;
+    private static final long DeletedItem = -2L;
+
+    private static final float SetFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section[] sections;
+
+    public static interface ConsumerLong {
+        void accept(long item);
+    }
+
+    public ConcurrentLongHashSet() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentLongHashSet(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentLongHashSet(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / SetFillFactor);
+        this.sections = new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
+    public boolean contains(long item) {
+        checkBiggerEqualZero(item);
+        long h = hash(item);
+        return getSection(h).contains(item, (int) h);
+    }
+
+    public boolean add(long item) {
+        checkBiggerEqualZero(item);
+        long h = hash(item);
+        return getSection(h).add(item, (int) h);
+    }
+
+    /**
+     * Remove an existing entry if found
+     *
+     * @param item
+     * @return true if removed or false if item was not present
+     */
+    public boolean remove(long item) {
+        checkBiggerEqualZero(item);
+        long h = hash(item);
+        return getSection(h).remove(item, (int) h);
+    }
+
+    private final Section getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(ConsumerLong processor) {
+        for (Section s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public Set<Long> items() {
+        Set<Long> items = new HashSet<>();
+        forEach(items::add);
+        return items;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section extends StampedLock {
+        // Keys and values are stored interleaved in the table array
+        private long[] table;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.table = new long[this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * SetFillFactor);
+            Arrays.fill(table, EmptyItem);
+        }
+
+        boolean contains(long item, int hash) {
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+            int bucket = signSafeMod(hash, capacity);
+
+            try {
+                while (true) {
+                    // First try optimistic locking
+                    long storedItem = table[bucket];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (item == storedItem) {
+                            return true;
+                        } else if (storedItem == EmptyItem) {
+                            // Not found
+                            return false;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            bucket = signSafeMod(hash, capacity);
+                            storedItem = table[bucket];
+                        }
+
+                        if (item == storedItem) {
+                            return true;
+                        } else if (storedItem == EmptyItem) {
+                            // Not found
+                            return false;
+                        }
+                    }
+
+                    bucket = (bucket + 1) & (table.length - 1);
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        boolean add(long item, long hash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(hash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedItem = -1;
+
+            try {
+                while (true) {
+                    long storedItem = table[bucket];
+
+                    if (item == storedItem) {
+                        // Item was already in set
+                        return false;
+                    } else if (storedItem == EmptyItem) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedItem != -1) {
+                            bucket = firstDeletedItem;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        table[bucket] = item;
+                        ++size;
+                        return true;
+                    } else if (storedItem == DeletedItem) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedItem == -1) {
+                            firstDeletedItem = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 1) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private boolean remove(long item, int hash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(hash, capacity);
+
+            try {
+                while (true) {
+                    long storedItem = table[bucket];
+                    if (item == storedItem) {
+                        --size;
+
+                        cleanBucket(bucket);
+                        return true;
+
+                    } else if (storedItem == EmptyItem) {
+                        // Key wasn't found
+                        return false;
+                    }
+
+                    bucket = (bucket + 1) & (table.length - 1);
+                }
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        private void cleanBucket(int bucket) {
+            int nextInArray = (bucket + 1) & (table.length - 1);
+            if (table[nextInArray] == EmptyItem) {
+                table[bucket] = EmptyItem;
+                --usedBuckets;
+            } else {
+                table[bucket] = DeletedItem;
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(table, EmptyItem);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(ConsumerLong processor) {
+            long stamp = tryOptimisticRead();
+
+            long[] table = this.table;
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+                    table = this.table;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket++) {
+                    long storedItem = table[bucket];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedItem = table[bucket];
+                    }
+
+                    if (storedItem != DeletedItem && storedItem != EmptyItem) {
+                        processor.accept(storedItem);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            long[] newTable = new long[newCapacity];
+            Arrays.fill(newTable, EmptyItem);
+
+            // Re-hash table
+            for (int i = 0; i < table.length; i++) {
+                long storedItem = table[i];
+                if (storedItem != EmptyItem && storedItem != DeletedItem) {
+                    insertKeyValueNoLock(newTable, newCapacity, storedItem);
+                }
+            }
+
+            capacity = newCapacity;
+            table = newTable;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * SetFillFactor);
+        }
+
+        private static void insertKeyValueNoLock(long[] table, int capacity, long item) {
+            int bucket = signSafeMod(hash(item), capacity);
+
+            while (true) {
+                long storedKey = table[bucket];
+
+                if (storedKey == EmptyItem) {
+                    // The bucket is empty, so we can use it
+                    table[bucket] = item;
+                    return;
+                }
+
+                bucket = (bucket + 1) & (table.length - 1);
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    final static long hash(long key) {
+        long hash = key * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) (n & (Max - 1));
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+
+    private static final void checkBiggerEqualZero(long n) {
+        if (n < 0L) {
+            throw new IllegalArgumentException("Keys and values must be >= 0");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
new file mode 100644
index 0000000..7b5b5c2
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
@@ -0,0 +1,723 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.LongPredicate;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Concurrent hash map from primitive long to long
+ *
+ * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
+ * allocations are required to store the values.
+ * <p>
+ * Keys <strong>MUST</strong> be >= 0.
+ */
+public class ConcurrentLongLongHashMap {
+
+    private static final long EmptyKey = -1L;
+    private static final long DeletedKey = -2L;
+
+    private static final long ValueNotFound = -1L;
+
+    private static final float MapFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section[] sections;
+
+    public static interface BiConsumerLong {
+        void accept(long key, long value);
+    }
+
+    public static interface LongLongFunction {
+        long apply(long key);
+    }
+
+    public static interface LongLongPredicate {
+        boolean test(long key, long value);
+    }
+
+    public ConcurrentLongLongHashMap() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentLongLongHashMap(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentLongLongHashMap(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        this.sections = new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
+    /**
+     *
+     * @param key
+     * @return the value or -1 if the key was not present
+     */
+    public long get(long key) {
+        checkBiggerEqualZero(key);
+        long h = hash(key);
+        return getSection(h).get(key, (int) h);
+    }
+
+    public boolean containsKey(long key) {
+        return get(key) != ValueNotFound;
+    }
+
+    public long put(long key, long value) {
+        checkBiggerEqualZero(key);
+        checkBiggerEqualZero(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, false, null);
+    }
+
+    public long putIfAbsent(long key, long value) {
+        checkBiggerEqualZero(key);
+        checkBiggerEqualZero(value);
+        long h = hash(key);
+        return getSection(h).put(key, value, (int) h, true, null);
+    }
+
+    public long computeIfAbsent(long key, LongLongFunction provider) {
+        checkBiggerEqualZero(key);
+        checkNotNull(provider);
+        long h = hash(key);
+        return getSection(h).put(key, ValueNotFound, (int) h, true, provider);
+    }
+
+    /**
+     * Atomically add the specified delta to a current value identified by the key. If the entry was not in the map, a
+     * new entry with default value 0 is added and then the delta is added.
+     *
+     * @param key
+     *            the entry key
+     * @param delta
+     *            the delta to add
+     * @return the new value of the entry
+     * @throws IllegalArgumentException
+     *             if the delta was invalid, such as it would have caused the value to be < 0
+     */
+    public long addAndGet(long key, long delta) {
+        checkBiggerEqualZero(key);
+        long h = hash(key);
+        return getSection(h).addAndGet(key, delta, (int) h);
+    }
+
+    /**
+     * Change the value for a specific key only if it matches the current value.
+     *
+     * @param key
+     * @param currentValue
+     * @param newValue
+     * @return
+     */
+    public boolean compareAndSet(long key, long currentValue, long newValue) {
+        checkBiggerEqualZero(key);
+        checkBiggerEqualZero(newValue);
+        long h = hash(key);
+        return getSection(h).compareAndSet(key, currentValue, newValue, (int) h);
+    }
+
+    /**
+     * Remove an existing entry if found
+     *
+     * @param key
+     * @return the value associated with the key or -1 if key was not present
+     */
+    public long remove(long key) {
+        checkBiggerEqualZero(key);
+        long h = hash(key);
+        return getSection(h).remove(key, ValueNotFound, (int) h);
+    }
+
+    public boolean remove(long key, long value) {
+        checkBiggerEqualZero(key);
+        checkBiggerEqualZero(value);
+        long h = hash(key);
+        return getSection(h).remove(key, value, (int) h) != ValueNotFound;
+    }
+
+    public int removeIf(LongPredicate filter) {
+        checkNotNull(filter);
+
+        int removedCount = 0;
+        for (Section s : sections) {
+            removedCount += s.removeIf(filter);
+        }
+
+        return removedCount;
+    }
+
+    public int removeIf(LongLongPredicate filter) {
+        checkNotNull(filter);
+
+        int removedCount = 0;
+        for (Section s : sections) {
+            removedCount += s.removeIf(filter);
+        }
+
+        return removedCount;
+    }
+
+    private final Section getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(BiConsumerLong processor) {
+        for (Section s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public List<Long> keys() {
+        List<Long> keys = Lists.newArrayList();
+        forEach((key, value) -> keys.add(key));
+        return keys;
+    }
+
+    public List<Long> values() {
+        List<Long> values = Lists.newArrayList();
+        forEach((key, value) -> values.add(value));
+        return values;
+    }
+
+    public Map<Long, Long> asMap() {
+        Map<Long, Long> map = Maps.newHashMap();
+        forEach((key, value) -> map.put(key, value));
+        return map;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section extends StampedLock {
+        // Keys and values are stored interleaved in the table array
+        private long[] table;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.table = new long[2 * this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            Arrays.fill(table, EmptyKey);
+        }
+
+        long get(long key, int keyHash) {
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    // First try optimistic locking
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (key == storedKey) {
+                            return storedValue;
+                        } else if (storedKey == EmptyKey) {
+                            // Not found
+                            return ValueNotFound;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            bucket = signSafeMod(keyHash, capacity);
+                            storedKey = table[bucket];
+                            storedValue = table[bucket + 1];
+                        }
+
+                        if (key == storedKey) {
+                            return storedValue;
+                        } else if (storedKey == EmptyKey) {
+                            // Not found
+                            return ValueNotFound;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        long put(long key, long value, int keyHash, boolean onlyIfAbsent, LongLongFunction valueProvider) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (key == storedKey) {
+                        if (!onlyIfAbsent) {
+                            // Over written an old value for same key
+                            table[bucket + 1] = value;
+                            return storedValue;
+                        } else {
+                            return storedValue;
+                        }
+                    } else if (storedKey == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        if (value == ValueNotFound) {
+                            value = valueProvider.apply(key);
+                        }
+
+                        table[bucket] = key;
+                        table[bucket + 1] = value;
+                        ++size;
+                        return valueProvider != null ? value : ValueNotFound;
+                    } else if (storedKey == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        long addAndGet(long key, long delta, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (key == storedKey) {
+                        // Over written an old value for same key
+                        long newValue = storedValue + delta;
+                        checkBiggerEqualZero(newValue);
+
+                        table[bucket + 1] = newValue;
+                        return newValue;
+                    } else if (storedKey == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        checkBiggerEqualZero(delta);
+
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        table[bucket] = key;
+                        table[bucket + 1] = delta;
+                        ++size;
+                        return delta;
+                    } else if (storedKey == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        boolean compareAndSet(long key, long currentValue, long newValue, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (key == storedKey) {
+                        if (storedValue != currentValue) {
+                            return false;
+                        }
+
+                        // Over write an old value for same key
+                        table[bucket + 1] = newValue;
+                        return true;
+                    } else if (storedKey == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map.
+                        if (currentValue == -1) {
+                            if (firstDeletedKey != -1) {
+                                bucket = firstDeletedKey;
+                            } else {
+                                ++usedBuckets;
+                            }
+
+                            table[bucket] = key;
+                            table[bucket + 1] = newValue;
+                            ++size;
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } else if (storedKey == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private long remove(long key, long value, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+                    if (key == storedKey) {
+                        if (value == ValueNotFound || value == storedValue) {
+                            --size;
+
+                            cleanBucket(bucket);
+                            return storedValue;
+                        } else {
+                            return ValueNotFound;
+                        }
+                    } else if (storedKey == EmptyKey) {
+                        // Key wasn't found
+                        return ValueNotFound;
+                    }
+
+                    bucket = (bucket + 2) & (table.length - 1);
+                }
+
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        int removeIf(LongPredicate filter) {
+            long stamp = writeLock();
+
+            int removedCount = 0;
+            try {
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    long storedKey = table[bucket];
+
+                    if (storedKey != DeletedKey && storedKey != EmptyKey) {
+                        if (filter.test(storedKey)) {
+                            // Removing item
+                            --size;
+                            ++removedCount;
+                            cleanBucket(bucket);
+                        }
+                    }
+                }
+
+                return removedCount;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        int removeIf(LongLongPredicate filter) {
+            long stamp = writeLock();
+
+            int removedCount = 0;
+            try {
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (storedKey != DeletedKey && storedKey != EmptyKey) {
+                        if (filter.test(storedKey, storedValue)) {
+                            // Removing item
+                            --size;
+                            ++removedCount;
+                            cleanBucket(bucket);
+                        }
+                    }
+                }
+
+                return removedCount;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        private void cleanBucket(int bucket) {
+            int nextInArray = (bucket + 2) & (table.length - 1);
+            if (table[nextInArray] == EmptyKey) {
+                table[bucket] = EmptyKey;
+                table[bucket + 1] = ValueNotFound;
+                --usedBuckets;
+            } else {
+                table[bucket] = DeletedKey;
+                table[bucket + 1] = ValueNotFound;
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(table, EmptyKey);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(BiConsumerLong processor) {
+            long stamp = tryOptimisticRead();
+
+            long[] table = this.table;
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+                    table = this.table;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                    long storedKey = table[bucket];
+                    long storedValue = table[bucket + 1];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedKey = table[bucket];
+                        storedValue = table[bucket + 1];
+                    }
+
+                    if (storedKey != DeletedKey && storedKey != EmptyKey) {
+                        processor.accept(storedKey, storedValue);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            long[] newTable = new long[2 * newCapacity];
+            Arrays.fill(newTable, EmptyKey);
+
+            // Re-hash table
+            for (int i = 0; i < table.length; i += 2) {
+                long storedKey = table[i];
+                long storedValue = table[i + 1];
+                if (storedKey != EmptyKey && storedKey != DeletedKey) {
+                    insertKeyValueNoLock(newTable, newCapacity, storedKey, storedValue);
+                }
+            }
+
+            capacity = newCapacity;
+            table = newTable;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * MapFillFactor);
+        }
+
+        private static void insertKeyValueNoLock(long[] table, int capacity, long key, long value) {
+            int bucket = signSafeMod(hash(key), capacity);
+
+            while (true) {
+                long storedKey = table[bucket];
+
+                if (storedKey == EmptyKey) {
+                    // The bucket is empty, so we can use it
+                    table[bucket] = key;
+                    table[bucket + 1] = value;
+                    return;
+                }
+
+                bucket = (bucket + 2) & (table.length - 1);
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    final static long hash(long key) {
+        long hash = key * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) (n & (Max - 1)) << 1;
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+
+    private static final void checkBiggerEqualZero(long n) {
+        if (n < 0L) {
+            throw new IllegalArgumentException("Keys and values must be >= 0");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
new file mode 100644
index 0000000..7677735
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
@@ -0,0 +1,550 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.StampedLock;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Concurrent hash map where both keys and values are composed of pairs of longs.
+ * <p>
+ * (long,long) --&gt; (long,long)
+ * <p>
+ * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
+ * allocations are required to store the keys and values, and no boxing is required.
+ * <p>
+ * Keys <strong>MUST</strong> be >= 0.
+ */
+public class ConcurrentLongLongPairHashMap {
+
+    private static final long EmptyKey = -1L;
+    private static final long DeletedKey = -2L;
+
+    private static final long ValueNotFound = -1L;
+
+    private static final float MapFillFactor = 0.66f;
+
+    private static final int DefaultExpectedItems = 256;
+    private static final int DefaultConcurrencyLevel = 16;
+
+    private final Section[] sections;
+
+    public static interface BiConsumerLongPair {
+        void accept(long key1, long key2, long value1, long value2);
+    }
+
+    public static interface LongLongPairFunction {
+        long apply(long key1, long key2);
+    }
+
+    public static interface LongLongPairPredicate {
+        boolean test(long key1, long key2, long value1, long value2);
+    }
+
+    public ConcurrentLongLongPairHashMap() {
+        this(DefaultExpectedItems);
+    }
+
+    public ConcurrentLongLongPairHashMap(int expectedItems) {
+        this(expectedItems, DefaultConcurrencyLevel);
+    }
+
+    public ConcurrentLongLongPairHashMap(int expectedItems, int concurrencyLevel) {
+        checkArgument(expectedItems > 0);
+        checkArgument(concurrencyLevel > 0);
+        checkArgument(expectedItems >= concurrencyLevel);
+
+        int numSections = concurrencyLevel;
+        int perSectionExpectedItems = expectedItems / numSections;
+        int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+        this.sections = new Section[numSections];
+
+        for (int i = 0; i < numSections; i++) {
+            sections[i] = new Section(perSectionCapacity);
+        }
+    }
+
+    public long size() {
+        long size = 0;
+        for (Section s : sections) {
+            size += s.size;
+        }
+        return size;
+    }
+
+    public long capacity() {
+        long capacity = 0;
+        for (Section s : sections) {
+            capacity += s.capacity;
+        }
+        return capacity;
+    }
+
+    public boolean isEmpty() {
+        for (Section s : sections) {
+            if (s.size != 0) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    long getUsedBucketCount() {
+        long usedBucketCount = 0;
+        for (Section s : sections) {
+            usedBucketCount += s.usedBuckets;
+        }
+        return usedBucketCount;
+    }
+
+    /**
+     *
+     * @param key
+     * @return the value or -1 if the key was not present
+     */
+    public LongPair get(long key1, long key2) {
+        checkBiggerEqualZero(key1);
+        long h = hash(key1, key2);
+        return getSection(h).get(key1, key2, (int) h);
+    }
+
+    public boolean containsKey(long key1, long key2) {
+        return get(key1, key2) != null;
+    }
+
+    public boolean put(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).put(key1, key2, value1, value2, (int) h, false);
+    }
+
+    public boolean putIfAbsent(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).put(key1, key2, value1, value2, (int) h, true);
+    }
+
+    /**
+     * Remove an existing entry if found
+     *
+     * @param key
+     * @return the value associated with the key or -1 if key was not present
+     */
+    public boolean remove(long key1, long key2) {
+        checkBiggerEqualZero(key1);
+        long h = hash(key1, key2);
+        return getSection(h).remove(key1, key2, ValueNotFound, ValueNotFound, (int) h);
+    }
+
+    public boolean remove(long key1, long key2, long value1, long value2) {
+        checkBiggerEqualZero(key1);
+        checkBiggerEqualZero(value1);
+        long h = hash(key1, key2);
+        return getSection(h).remove(key1, key2, value1, value2, (int) h);
+    }
+
+    private final Section getSection(long hash) {
+        // Use 32 msb out of long to get the section
+        final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+        return sections[sectionIdx];
+    }
+
+    public void clear() {
+        for (Section s : sections) {
+            s.clear();
+        }
+    }
+
+    public void forEach(BiConsumerLongPair processor) {
+        for (Section s : sections) {
+            s.forEach(processor);
+        }
+    }
+
+    /**
+     * @return a new list of all keys (makes a copy)
+     */
+    public List<LongPair> keys() {
+        List<LongPair> keys = Lists.newArrayList();
+        forEach((key1, key2, value1, value2) -> keys.add(new LongPair(key1, key2)));
+        return keys;
+    }
+
+    public List<LongPair> values() {
+        List<LongPair> values = Lists.newArrayList();
+        forEach((key1, key2, value1, value2) -> values.add(new LongPair(value1, value2)));
+        return values;
+    }
+
+    public Map<LongPair, LongPair> asMap() {
+        Map<LongPair, LongPair> map = Maps.newHashMap();
+        forEach((key1, key2, value1, value2) -> map.put(new LongPair(key1, key2), new LongPair(value1, value2)));
+        return map;
+    }
+
+    // A section is a portion of the hash map that is covered by a single
+    @SuppressWarnings("serial")
+    private static final class Section extends StampedLock {
+        // Keys and values are stored interleaved in the table array
+        private long[] table;
+
+        private int capacity;
+        private volatile int size;
+        private int usedBuckets;
+        private int resizeThreshold;
+
+        Section(int capacity) {
+            this.capacity = alignToPowerOfTwo(capacity);
+            this.table = new long[4 * this.capacity];
+            this.size = 0;
+            this.usedBuckets = 0;
+            this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+            Arrays.fill(table, EmptyKey);
+        }
+
+        LongPair get(long key1, long key2, int keyHash) {
+            long stamp = tryOptimisticRead();
+            boolean acquiredLock = false;
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    // First try optimistic locking
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+
+                    if (!acquiredLock && validate(stamp)) {
+                        // The values we have read are consistent
+                        if (key1 == storedKey1 && key2 == storedKey2) {
+                            return new LongPair(storedValue1, storedValue2);
+                        } else if (storedKey1 == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    } else {
+                        // Fallback to acquiring read lock
+                        if (!acquiredLock) {
+                            stamp = readLock();
+                            acquiredLock = true;
+
+                            bucket = signSafeMod(keyHash, capacity);
+                            storedKey1 = table[bucket];
+                            storedKey2 = table[bucket + 1];
+                            storedValue1 = table[bucket + 2];
+                            storedValue2 = table[bucket + 3];
+                        }
+
+                        if (key1 == storedKey1 && key2 == storedKey2) {
+                            return new LongPair(storedValue1, storedValue2);
+                        } else if (storedKey1 == EmptyKey) {
+                            // Not found
+                            return null;
+                        }
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+            } finally {
+                if (acquiredLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        boolean put(long key1, long key2, long value1, long value2, int keyHash, boolean onlyIfAbsent) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            // Remember where we find the first available spot
+            int firstDeletedKey = -1;
+
+            try {
+                while (true) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+
+                    if (key1 == storedKey1 && key2 == storedKey2) {
+                        if (!onlyIfAbsent) {
+                            // Over written an old value for same key
+                            table[bucket + 2] = value1;
+                            table[bucket + 3] = value2;
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } else if (storedKey1 == EmptyKey) {
+                        // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+                        // key, we should write at that position
+                        if (firstDeletedKey != -1) {
+                            bucket = firstDeletedKey;
+                        } else {
+                            ++usedBuckets;
+                        }
+
+                        table[bucket] = key1;
+                        table[bucket + 1] = key2;
+                        table[bucket + 2] = value1;
+                        table[bucket + 3] = value2;
+                        ++size;
+                        return true;
+                    } else if (storedKey1 == DeletedKey) {
+                        // The bucket contained a different deleted key
+                        if (firstDeletedKey == -1) {
+                            firstDeletedKey = bucket;
+                        }
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+            } finally {
+                if (usedBuckets > resizeThreshold) {
+                    try {
+                        rehash();
+                    } finally {
+                        unlockWrite(stamp);
+                    }
+                } else {
+                    unlockWrite(stamp);
+                }
+            }
+        }
+
+        private boolean remove(long key1, long key2, long value1, long value2, int keyHash) {
+            long stamp = writeLock();
+            int bucket = signSafeMod(keyHash, capacity);
+
+            try {
+                while (true) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+                    if (key1 == storedKey1 && key2 == storedKey2) {
+                        if (value1 == ValueNotFound || (value1 == storedValue1 && value2 == storedValue2)) {
+                            --size;
+
+                            cleanBucket(bucket);
+                            return true;
+                        } else {
+                            return false;
+                        }
+                    } else if (storedKey1 == EmptyKey) {
+                        // Key wasn't found
+                        return false;
+                    }
+
+                    bucket = (bucket + 4) & (table.length - 1);
+                }
+
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        private void cleanBucket(int bucket) {
+            int nextInArray = (bucket + 4) & (table.length - 1);
+            if (table[nextInArray] == EmptyKey) {
+                table[bucket] = EmptyKey;
+                table[bucket + 1] = EmptyKey;
+                table[bucket + 2] = ValueNotFound;
+                table[bucket + 3] = ValueNotFound;
+                --usedBuckets;
+            } else {
+                table[bucket] = DeletedKey;
+                table[bucket + 1] = DeletedKey;
+                table[bucket + 2] = ValueNotFound;
+                table[bucket + 3] = ValueNotFound;
+            }
+        }
+
+        void clear() {
+            long stamp = writeLock();
+
+            try {
+                Arrays.fill(table, EmptyKey);
+                this.size = 0;
+                this.usedBuckets = 0;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
+        public void forEach(BiConsumerLongPair processor) {
+            long stamp = tryOptimisticRead();
+
+            long[] table = this.table;
+            boolean acquiredReadLock = false;
+
+            try {
+
+                // Validate no rehashing
+                if (!validate(stamp)) {
+                    // Fallback to read lock
+                    stamp = readLock();
+                    acquiredReadLock = true;
+                    table = this.table;
+                }
+
+                // Go through all the buckets for this section
+                for (int bucket = 0; bucket < table.length; bucket += 4) {
+                    long storedKey1 = table[bucket];
+                    long storedKey2 = table[bucket + 1];
+                    long storedValue1 = table[bucket + 2];
+                    long storedValue2 = table[bucket + 3];
+
+                    if (!acquiredReadLock && !validate(stamp)) {
+                        // Fallback to acquiring read lock
+                        stamp = readLock();
+                        acquiredReadLock = true;
+
+                        storedKey1 = table[bucket];
+                        storedKey2 = table[bucket + 1];
+                        storedValue1 = table[bucket + 2];
+                        storedValue2 = table[bucket + 3];
+                    }
+
+                    if (storedKey1 != DeletedKey && storedKey1 != EmptyKey) {
+                        processor.accept(storedKey1, storedKey2, storedValue1, storedValue2);
+                    }
+                }
+            } finally {
+                if (acquiredReadLock) {
+                    unlockRead(stamp);
+                }
+            }
+        }
+
+        private void rehash() {
+            // Expand the hashmap
+            int newCapacity = capacity * 2;
+            long[] newTable = new long[4 * newCapacity];
+            Arrays.fill(newTable, EmptyKey);
+
+            // Re-hash table
+            for (int i = 0; i < table.length; i += 4) {
+                long storedKey1 = table[i];
+                long storedKey2 = table[i + 1];
+                long storedValue1 = table[i + 2];
+                long storedValue2 = table[i + 3];
+                if (storedKey1 != EmptyKey && storedKey1 != DeletedKey) {
+                    insertKeyValueNoLock(newTable, newCapacity, storedKey1, storedKey2, storedValue1, storedValue2);
+                }
+            }
+
+            capacity = newCapacity;
+            table = newTable;
+            usedBuckets = size;
+            resizeThreshold = (int) (capacity * MapFillFactor);
+        }
+
+        private static void insertKeyValueNoLock(long[] table, int capacity, long key1, long key2, long value1,
+                long value2) {
+            int bucket = signSafeMod(hash(key1, key2), capacity);
+
+            while (true) {
+                long storedKey1 = table[bucket];
+
+                if (storedKey1 == EmptyKey) {
+                    // The bucket is empty, so we can use it
+                    table[bucket] = key1;
+                    table[bucket + 1] = key2;
+                    table[bucket + 2] = value1;
+                    table[bucket + 3] = value2;
+                    return;
+                }
+
+                bucket = (bucket + 4) & (table.length - 1);
+            }
+        }
+    }
+
+    private static final long HashMixer = 0xc6a4a7935bd1e995l;
+    private static final int R = 47;
+
+    final static long hash(long key1, long key2) {
+        long hash = key1 * HashMixer;
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        hash += 31 + (key2 * HashMixer);
+        hash ^= hash >>> R;
+        hash *= HashMixer;
+        return hash;
+    }
+
+    static final int signSafeMod(long n, int Max) {
+        return (int) (n & (Max - 1)) << 2;
+    }
+
+    private static final int alignToPowerOfTwo(int n) {
+        return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+    }
+
+    private static final void checkBiggerEqualZero(long n) {
+        if (n < 0L) {
+            throw new IllegalArgumentException("Keys and values must be >= 0");
+        }
+    }
+
+    public static class LongPair implements Comparable<LongPair> {
+        public final long first;
+        public final long second;
+
+        public LongPair(long first, long second) {
+            this.first = first;
+            this.second = second;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof LongPair) {
+                LongPair other = (LongPair) obj;
+                return first == other.first && second == other.second;
+            }
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return (int) hash(first, second);
+        }
+
+        @Override
+        public int compareTo(LongPair o) {
+            if (first != o.first) {
+                return Long.compare(first, o.first);
+            } else {
+                return Long.compare(second, o.second);
+            }
+        }
+    }
+}