You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/11/29 23:18:01 UTC
[2/3] bookkeeper git commit: BOOKKEEPER-964: Add concurrent maps and
sets for primitive types
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
new file mode 100644
index 0000000..90fc548
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
@@ -0,0 +1,493 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Concurrent hash map
+ *
+ * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
+ * allocations are required to store the values
+ *
+ * @param <V>
+ */
+@SuppressWarnings("unchecked")
+public class ConcurrentOpenHashMap<K, V> {
+
+ private static final Object EmptyKey = null;
+ private static final Object DeletedKey = new Object();
+
+ private static final float MapFillFactor = 0.66f;
+
+ private static final int DefaultExpectedItems = 256;
+ private static final int DefaultConcurrencyLevel = 16;
+
+ private final Section<K, V>[] sections;
+
+ public ConcurrentOpenHashMap() {
+ this(DefaultExpectedItems);
+ }
+
+ public ConcurrentOpenHashMap(int expectedItems) {
+ this(expectedItems, DefaultConcurrencyLevel);
+ }
+
+ public ConcurrentOpenHashMap(int expectedItems, int concurrencyLevel) {
+ checkArgument(expectedItems > 0);
+ checkArgument(concurrencyLevel > 0);
+ checkArgument(expectedItems >= concurrencyLevel);
+
+ int numSections = concurrencyLevel;
+ int perSectionExpectedItems = expectedItems / numSections;
+ int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+ this.sections = (Section<K, V>[]) new Section[numSections];
+
+ for (int i = 0; i < numSections; i++) {
+ sections[i] = new Section<>(perSectionCapacity);
+ }
+ }
+
+ public long size() {
+ long size = 0;
+ for (Section<K, V> s : sections) {
+ size += s.size;
+ }
+ return size;
+ }
+
+ public long capacity() {
+ long capacity = 0;
+ for (Section<K, V> s : sections) {
+ capacity += s.capacity;
+ }
+ return capacity;
+ }
+
+ public boolean isEmpty() {
+ for (Section<K, V> s : sections) {
+ if (s.size != 0) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public V get(K key) {
+ checkNotNull(key);
+ long h = hash(key);
+ return getSection(h).get(key, (int) h);
+ }
+
+ public boolean containsKey(K key) {
+ return get(key) != null;
+ }
+
+ public V put(K key, V value) {
+ checkNotNull(key);
+ checkNotNull(value);
+ long h = hash(key);
+ return getSection(h).put(key, value, (int) h, false, null);
+ }
+
+ public V putIfAbsent(K key, V value) {
+ checkNotNull(key);
+ checkNotNull(value);
+ long h = hash(key);
+ return getSection(h).put(key, value, (int) h, true, null);
+ }
+
+ public V computeIfAbsent(K key, Function<K, V> provider) {
+ checkNotNull(key);
+ checkNotNull(provider);
+ long h = hash(key);
+ return getSection(h).put(key, null, (int) h, true, provider);
+ }
+
+ public V remove(K key) {
+ checkNotNull(key);
+ long h = hash(key);
+ return getSection(h).remove(key, null, (int) h);
+ }
+
+ public boolean remove(K key, Object value) {
+ checkNotNull(key);
+ checkNotNull(value);
+ long h = hash(key);
+ return getSection(h).remove(key, value, (int) h) != null;
+ }
+
+ private Section<K, V> getSection(long hash) {
+ // Use 32 msb out of long to get the section
+ final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+ return sections[sectionIdx];
+ }
+
+ public void clear() {
+ for (Section<K, V> s : sections) {
+ s.clear();
+ }
+ }
+
+ public void forEach(BiConsumer<? super K, ? super V> processor) {
+ for (Section<K, V> s : sections) {
+ s.forEach(processor);
+ }
+ }
+
+ public int removeIf(BiPredicate<K, V> filter) {
+ checkNotNull(filter);
+
+ int removedCount = 0;
+ for (Section<K,V> s : sections) {
+ removedCount += s.removeIf(filter);
+ }
+
+ return removedCount;
+ }
+
+ /**
+ * @return a new list of all keys (makes a copy)
+ */
+ public List<K> keys() {
+ List<K> keys = Lists.newArrayList();
+ forEach((key, value) -> keys.add(key));
+ return keys;
+ }
+
+ public List<V> values() {
+ List<V> values = Lists.newArrayList();
+ forEach((key, value) -> values.add(value));
+ return values;
+ }
+
+ // A section is a portion of the hash map that is covered by a single
+ @SuppressWarnings("serial")
+ private static final class Section<K, V> extends StampedLock {
+ // Keys and values are stored interleaved in the table array
+ private Object[] table;
+
+ private int capacity;
+ private volatile int size;
+ private int usedBuckets;
+ private int resizeThreshold;
+
+ Section(int capacity) {
+ this.capacity = alignToPowerOfTwo(capacity);
+ this.table = new Object[2 * this.capacity];
+ this.size = 0;
+ this.usedBuckets = 0;
+ this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+ }
+
+ V get(K key, int keyHash) {
+ long stamp = tryOptimisticRead();
+ boolean acquiredLock = false;
+ int bucket = signSafeMod(keyHash, capacity);
+
+ try {
+ while (true) {
+ // First try optimistic locking
+ K storedKey = (K) table[bucket];
+ V storedValue = (V) table[bucket + 1];
+
+ if (!acquiredLock && validate(stamp)) {
+ // The values we have read are consistent
+ if (key.equals(storedKey)) {
+ return storedValue;
+ } else if (storedKey == EmptyKey) {
+ // Not found
+ return null;
+ }
+ } else {
+ // Fallback to acquiring read lock
+ if (!acquiredLock) {
+ stamp = readLock();
+ acquiredLock = true;
+
+ bucket = signSafeMod(keyHash, capacity);
+ storedKey = (K) table[bucket];
+ storedValue = (V) table[bucket + 1];
+ }
+
+ if (key.equals(storedKey)) {
+ return storedValue;
+ } else if (storedKey == EmptyKey) {
+ // Not found
+ return null;
+ }
+ }
+
+ bucket = (bucket + 2) & (table.length - 1);
+ }
+ } finally {
+ if (acquiredLock) {
+ unlockRead(stamp);
+ }
+ }
+ }
+
+ V put(K key, V value, int keyHash, boolean onlyIfAbsent, Function<K, V> valueProvider) {
+ long stamp = writeLock();
+ int bucket = signSafeMod(keyHash, capacity);
+
+ // Remember where we find the first available spot
+ int firstDeletedKey = -1;
+
+ try {
+ while (true) {
+ K storedKey = (K) table[bucket];
+ V storedValue = (V) table[bucket + 1];
+
+ if (key.equals(storedKey)) {
+ if (!onlyIfAbsent) {
+ // Over written an old value for same key
+ table[bucket + 1] = value;
+ return storedValue;
+ } else {
+ return storedValue;
+ }
+ } else if (storedKey == EmptyKey) {
+ // Found an empty bucket. This means the key is not in the map. If we've already seen a deleted
+ // key, we should write at that position
+ if (firstDeletedKey != -1) {
+ bucket = firstDeletedKey;
+ } else {
+ ++usedBuckets;
+ }
+
+ if (value == null) {
+ value = valueProvider.apply(key);
+ }
+
+ table[bucket] = key;
+ table[bucket + 1] = value;
+ ++size;
+ return valueProvider != null ? value : null;
+ } else if (storedKey == DeletedKey) {
+ // The bucket contained a different deleted key
+ if (firstDeletedKey == -1) {
+ firstDeletedKey = bucket;
+ }
+ }
+
+ bucket = (bucket + 2) & (table.length - 1);
+ }
+ } finally {
+ if (usedBuckets > resizeThreshold) {
+ try {
+ rehash();
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
+ }
+ }
+
+ private V remove(K key, Object value, int keyHash) {
+ long stamp = writeLock();
+ int bucket = signSafeMod(keyHash, capacity);
+
+ try {
+ while (true) {
+ K storedKey = (K) table[bucket];
+ V storedValue = (V) table[bucket + 1];
+ if (key.equals(storedKey)) {
+ if (value == null || value.equals(storedValue)) {
+ --size;
+ cleanBucket(bucket);
+ return storedValue;
+ } else {
+ return null;
+ }
+ } else if (storedKey == EmptyKey) {
+ // Key wasn't found
+ return null;
+ }
+
+ bucket = (bucket + 2) & (table.length - 1);
+ }
+
+ } finally {
+ unlockWrite(stamp);
+ }
+ }
+
+ void clear() {
+ long stamp = writeLock();
+
+ try {
+ Arrays.fill(table, EmptyKey);
+ this.size = 0;
+ this.usedBuckets = 0;
+ } finally {
+ unlockWrite(stamp);
+ }
+ }
+
+ public void forEach(BiConsumer<? super K, ? super V> processor) {
+ long stamp = tryOptimisticRead();
+
+ Object[] table = this.table;
+ boolean acquiredReadLock = false;
+
+ try {
+
+ // Validate no rehashing
+ if (!validate(stamp)) {
+ // Fallback to read lock
+ stamp = readLock();
+ acquiredReadLock = true;
+ table = this.table;
+ }
+
+ // Go through all the buckets for this section
+ for (int bucket = 0; bucket < table.length; bucket += 2) {
+ K storedKey = (K) table[bucket];
+ V storedValue = (V) table[bucket + 1];
+
+ if (!acquiredReadLock && !validate(stamp)) {
+ // Fallback to acquiring read lock
+ stamp = readLock();
+ acquiredReadLock = true;
+
+ storedKey = (K) table[bucket];
+ storedValue = (V) table[bucket + 1];
+ }
+
+ if (storedKey != DeletedKey && storedKey != EmptyKey) {
+ processor.accept(storedKey, storedValue);
+ }
+ }
+ } finally {
+ if (acquiredReadLock) {
+ unlockRead(stamp);
+ }
+ }
+ }
+
+ int removeIf(BiPredicate<K, V> filter) {
+ long stamp = writeLock();
+
+ int removedCount = 0;
+ try {
+ // Go through all the buckets for this section
+ for (int bucket = 0; bucket < table.length; bucket += 2) {
+ K storedKey = (K) table[bucket];
+ V storedValue = (V) table[bucket + 1];
+
+ if (storedKey != DeletedKey && storedKey != EmptyKey) {
+ if (filter.test(storedKey, storedValue)) {
+ // Removing item
+ --size;
+ ++removedCount;
+ cleanBucket(bucket);
+ }
+ }
+ }
+
+ return removedCount;
+ } finally {
+ unlockWrite(stamp);
+ }
+ }
+
+ private final void cleanBucket(int bucket) {
+ int nextInArray = (bucket + 2) & (table.length - 1);
+ if (table[nextInArray] == EmptyKey) {
+ table[bucket] = EmptyKey;
+ table[bucket + 1] = null;
+ --usedBuckets;
+ } else {
+ table[bucket] = DeletedKey;
+ table[bucket + 1] = null;
+ }
+ }
+
+ private void rehash() {
+ // Expand the hashmap
+ int newCapacity = capacity * 2;
+ Object[] newTable = new Object[2 * newCapacity];
+
+ // Re-hash table
+ for (int i = 0; i < table.length; i += 2) {
+ K storedKey = (K) table[i];
+ V storedValue = (V) table[i + 1];
+ if (storedKey != EmptyKey && storedKey != DeletedKey) {
+ insertKeyValueNoLock(newTable, newCapacity, storedKey, storedValue);
+ }
+ }
+
+ capacity = newCapacity;
+ table = newTable;
+ usedBuckets = size;
+ resizeThreshold = (int) (capacity * MapFillFactor);
+ }
+
+ private static <K, V> void insertKeyValueNoLock(Object[] table, int capacity, K key, V value) {
+ int bucket = signSafeMod(hash(key), capacity);
+
+ while (true) {
+ K storedKey = (K) table[bucket];
+
+ if (storedKey == EmptyKey) {
+ // The bucket is empty, so we can use it
+ table[bucket] = key;
+ table[bucket + 1] = value;
+ return;
+ }
+
+ bucket = (bucket + 2) & (table.length - 1);
+ }
+ }
+ }
+
+ private static final long HashMixer = 0xc6a4a7935bd1e995l;
+ private static final int R = 47;
+
+ final static <K> long hash(K key) {
+ long hash = key.hashCode() * HashMixer;
+ hash ^= hash >>> R;
+ hash *= HashMixer;
+ return hash;
+ }
+
+ static final int signSafeMod(long n, int Max) {
+ return (int) (n & (Max - 1)) << 1;
+ }
+
+ private static final int alignToPowerOfTwo(int n) {
+ return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
new file mode 100644
index 0000000..99a552d
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
@@ -0,0 +1,416 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Concurrent hash set
+ *
+ * Provides similar methods as a ConcurrentMap<K,V> but since it's an open hash map with linear probing, no node
+ * allocations are required to store the values
+ *
+ * @param <V>
+ */
+@SuppressWarnings("unchecked")
+public class ConcurrentOpenHashSet<V> {
+
+ private static final Object EmptyValue = null;
+ private static final Object DeletedValue = new Object();
+
+ private static final float MapFillFactor = 0.66f;
+
+ private static final int DefaultExpectedItems = 256;
+ private static final int DefaultConcurrencyLevel = 16;
+
+ private final Section<V>[] sections;
+
+ public ConcurrentOpenHashSet() {
+ this(DefaultExpectedItems);
+ }
+
+ public ConcurrentOpenHashSet(int expectedItems) {
+ this(expectedItems, DefaultConcurrencyLevel);
+ }
+
+ public ConcurrentOpenHashSet(int expectedItems, int concurrencyLevel) {
+ checkArgument(expectedItems > 0);
+ checkArgument(concurrencyLevel > 0);
+ checkArgument(expectedItems >= concurrencyLevel);
+
+ int numSections = concurrencyLevel;
+ int perSectionExpectedItems = expectedItems / numSections;
+ int perSectionCapacity = (int) (perSectionExpectedItems / MapFillFactor);
+ this.sections = (Section<V>[]) new Section[numSections];
+
+ for (int i = 0; i < numSections; i++) {
+ sections[i] = new Section<>(perSectionCapacity);
+ }
+ }
+
+ public long size() {
+ long size = 0;
+ for (Section<V> s : sections) {
+ size += s.size;
+ }
+ return size;
+ }
+
+ public long capacity() {
+ long capacity = 0;
+ for (Section<V> s : sections) {
+ capacity += s.capacity;
+ }
+ return capacity;
+ }
+
+ public boolean isEmpty() {
+ for (Section<V> s : sections) {
+ if (s.size != 0) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public boolean contains(V value) {
+ checkNotNull(value);
+ long h = hash(value);
+ return getSection(h).contains(value, (int) h);
+ }
+
+ public boolean add(V value) {
+ checkNotNull(value);
+ long h = hash(value);
+ return getSection(h).add(value, (int) h);
+ }
+
+ public boolean remove(V value) {
+ checkNotNull(value);
+ long h = hash(value);
+ return getSection(h).remove(value, (int) h);
+ }
+
+ private Section<V> getSection(long hash) {
+ // Use 32 msb out of long to get the section
+ final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
+ return sections[sectionIdx];
+ }
+
+ public void clear() {
+ for (Section<V> s : sections) {
+ s.clear();
+ }
+ }
+
+ public void forEach(Consumer<? super V> processor) {
+ for (Section<V> s : sections) {
+ s.forEach(processor);
+ }
+ }
+
+ /**
+ * @return a new list of all values (makes a copy)
+ */
+ List<V> values() {
+ List<V> values = Lists.newArrayList();
+ forEach(value -> values.add(value));
+ return values;
+ }
+
+ // A section is a portion of the hash map that is covered by a single
+ @SuppressWarnings("serial")
+ private static final class Section<V> extends StampedLock {
+ private V[] values;
+
+ private int capacity;
+ private volatile int size;
+ private int usedBuckets;
+ private int resizeThreshold;
+
+ Section(int capacity) {
+ this.capacity = alignToPowerOfTwo(capacity);
+ this.values = (V[]) new Object[this.capacity];
+ this.size = 0;
+ this.usedBuckets = 0;
+ this.resizeThreshold = (int) (this.capacity * MapFillFactor);
+ }
+
+ boolean contains(V value, int keyHash) {
+ int bucket = keyHash;
+
+ long stamp = tryOptimisticRead();
+ boolean acquiredLock = false;
+
+ try {
+ while (true) {
+ int capacity = this.capacity;
+ bucket = signSafeMod(bucket, capacity);
+
+ // First try optimistic locking
+ V storedValue = values[bucket];
+
+ if (!acquiredLock && validate(stamp)) {
+ // The values we have read are consistent
+ if (value.equals(storedValue)) {
+ return true;
+ } else if (storedValue == EmptyValue) {
+ // Not found
+ return false;
+ }
+ } else {
+ // Fallback to acquiring read lock
+ if (!acquiredLock) {
+ stamp = readLock();
+ acquiredLock = true;
+
+ storedValue = values[bucket];
+ }
+
+ if (capacity != this.capacity) {
+ // There has been a rehashing. We need to restart the search
+ bucket = keyHash;
+ continue;
+ }
+
+ if (value.equals(storedValue)) {
+ return true;
+ } else if (storedValue == EmptyValue) {
+ // Not found
+ return false;
+ }
+ }
+
+ ++bucket;
+ }
+ } finally {
+ if (acquiredLock) {
+ unlockRead(stamp);
+ }
+ }
+ }
+
+ boolean add(V value, int keyHash) {
+ int bucket = keyHash;
+
+ long stamp = writeLock();
+ int capacity = this.capacity;
+
+ // Remember where we find the first available spot
+ int firstDeletedValue = -1;
+
+ try {
+ while (true) {
+ bucket = signSafeMod(bucket, capacity);
+
+ V storedValue = values[bucket];
+
+ if (value.equals(storedValue)) {
+ return false;
+ } else if (storedValue == EmptyValue) {
+ // Found an empty bucket. This means the value is not in the set. If we've already seen a
+ // deleted value, we should write at that position
+ if (firstDeletedValue != -1) {
+ bucket = firstDeletedValue;
+ } else {
+ ++usedBuckets;
+ }
+
+ values[bucket] = value;
+ ++size;
+ return true;
+ } else if (storedValue == DeletedValue) {
+ // The bucket contained a different deleted key
+ if (firstDeletedValue == -1) {
+ firstDeletedValue = bucket;
+ }
+ }
+
+ ++bucket;
+ }
+ } finally {
+ if (usedBuckets > resizeThreshold) {
+ try {
+ rehash();
+ } finally {
+ unlockWrite(stamp);
+ }
+ } else {
+ unlockWrite(stamp);
+ }
+ }
+ }
+
+ private boolean remove(V value, int keyHash) {
+ int bucket = keyHash;
+ long stamp = writeLock();
+
+ try {
+ while (true) {
+ int capacity = this.capacity;
+ bucket = signSafeMod(bucket, capacity);
+
+ V storedValue = values[bucket];
+ if (value.equals(storedValue)) {
+ --size;
+
+ int nextInArray = signSafeMod(bucket + 1, capacity);
+ if (values[nextInArray] == EmptyValue) {
+ values[bucket] = (V) EmptyValue;
+ --usedBuckets;
+ } else {
+ values[bucket] = (V) DeletedValue;
+ }
+
+ return true;
+ } else if (storedValue == EmptyValue) {
+ // Value wasn't found
+ return false;
+ }
+
+ ++bucket;
+ }
+
+ } finally {
+ unlockWrite(stamp);
+ }
+ }
+
+ void clear() {
+ long stamp = writeLock();
+
+ try {
+ Arrays.fill(values, EmptyValue);
+ this.size = 0;
+ this.usedBuckets = 0;
+ } finally {
+ unlockWrite(stamp);
+ }
+ }
+
+ public void forEach(Consumer<? super V> processor) {
+ long stamp = tryOptimisticRead();
+
+ int capacity = this.capacity;
+ V[] values = this.values;
+
+ boolean acquiredReadLock = false;
+
+ try {
+
+ // Validate no rehashing
+ if (!validate(stamp)) {
+ // Fallback to read lock
+ stamp = readLock();
+ acquiredReadLock = true;
+
+ capacity = this.capacity;
+ values = this.values;
+ }
+
+ // Go through all the buckets for this section
+ for (int bucket = 0; bucket < capacity; bucket++) {
+ V storedValue = values[bucket];
+
+ if (!acquiredReadLock && !validate(stamp)) {
+ // Fallback to acquiring read lock
+ stamp = readLock();
+ acquiredReadLock = true;
+
+ storedValue = values[bucket];
+ }
+
+ if (storedValue != DeletedValue && storedValue != EmptyValue) {
+ processor.accept(storedValue);
+ }
+ }
+ } finally {
+ if (acquiredReadLock) {
+ unlockRead(stamp);
+ }
+ }
+ }
+
+ private void rehash() {
+ // Expand the hashmap
+ int newCapacity = capacity * 2;
+ V[] newValues = (V[]) new Object[newCapacity];
+
+ // Re-hash table
+ for (int i = 0; i < values.length; i++) {
+ V storedValue = values[i];
+ if (storedValue != EmptyValue && storedValue != DeletedValue) {
+ insertValueNoLock(newValues, storedValue);
+ }
+ }
+
+ capacity = newCapacity;
+ values = newValues;
+ usedBuckets = size;
+ resizeThreshold = (int) (capacity * MapFillFactor);
+ }
+
+ private static <V> void insertValueNoLock(V[] values, V value) {
+ int bucket = (int) hash(value);
+
+ while (true) {
+ bucket = signSafeMod(bucket, values.length);
+
+ V storedValue = values[bucket];
+
+ if (storedValue == EmptyValue) {
+ // The bucket is empty, so we can use it
+ values[bucket] = value;
+ return;
+ }
+
+ ++bucket;
+ }
+ }
+ }
+
+ private static final long HashMixer = 0xc6a4a7935bd1e995l;
+ private static final int R = 47;
+
+ final static <K> long hash(K key) {
+ long hash = key.hashCode() * HashMixer;
+ hash ^= hash >>> R;
+ hash *= HashMixer;
+ return hash;
+ }
+
+ static final int signSafeMod(long n, int Max) {
+ return (int) n & (Max - 1);
+ }
+
+ private static final int alignToPowerOfTwo(int n) {
+ return (int) Math.pow(2, 32 - Integer.numberOfLeadingZeros(n - 1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
new file mode 100644
index 0000000..44f42b8
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -0,0 +1,435 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongFunction;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class ConcurrentLongHashMapTest {
+
+ @Test
+ public void testConstructor() {
+ try {
+ new ConcurrentLongHashMap<String>(0);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ new ConcurrentLongHashMap<String>(16, 0);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ new ConcurrentLongHashMap<String>(4, 8);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+ }
+
+ @Test
+ public void simpleInsertions() {
+ ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16);
+
+ assertTrue(map.isEmpty());
+ assertNull(map.put(1, "one"));
+ assertFalse(map.isEmpty());
+
+ assertNull(map.put(2, "two"));
+ assertNull(map.put(3, "three"));
+
+ assertEquals(map.size(), 3);
+
+ assertEquals(map.get(1), "one");
+ assertEquals(map.size(), 3);
+
+ assertEquals(map.remove(1), "one");
+ assertEquals(map.size(), 2);
+ assertEquals(map.get(1), null);
+ assertEquals(map.get(5), null);
+ assertEquals(map.size(), 2);
+
+ assertNull(map.put(1, "one"));
+ assertEquals(map.size(), 3);
+ assertEquals(map.put(1, "uno"), "one");
+ assertEquals(map.size(), 3);
+ }
+
+ @Test
+ public void testRemove() {
+ ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+
+ assertTrue(map.isEmpty());
+ assertNull(map.put(1, "one"));
+ assertFalse(map.isEmpty());
+
+ assertFalse(map.remove(0, "zero"));
+ assertFalse(map.remove(1, "uno"));
+
+ assertFalse(map.isEmpty());
+ assertTrue(map.remove(1, "one"));
+ assertTrue(map.isEmpty());
+ }
+
+ @Test
+ public void testNegativeUsedBucketCount() {
+ ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+
+ map.put(0, "zero");
+ assertEquals(1, map.getUsedBucketCount());
+ map.put(0, "zero1");
+ assertEquals(1, map.getUsedBucketCount());
+ map.remove(0);
+ assertEquals(0, map.getUsedBucketCount());
+ map.remove(0);
+ assertEquals(0, map.getUsedBucketCount());
+ }
+
+ @Test
+ public void testRehashing() {
+ int n = 16;
+ ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+ assertEquals(map.capacity(), n);
+ assertEquals(map.size(), 0);
+
+ for (int i = 0; i < n; i++) {
+ map.put(i, i);
+ }
+
+ assertEquals(map.capacity(), 2 * n);
+ assertEquals(map.size(), n);
+ }
+
+ @Test
+ public void testRehashingWithDeletes() {
+ int n = 16;
+ ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(n / 2, 1);
+ assertEquals(map.capacity(), n);
+ assertEquals(map.size(), 0);
+
+ for (int i = 0; i < n / 2; i++) {
+ map.put(i, i);
+ }
+
+ for (int i = 0; i < n / 2; i++) {
+ map.remove(i);
+ }
+
+ for (int i = n; i < (2 * n); i++) {
+ map.put(i, i);
+ }
+
+ assertEquals(map.capacity(), 2 * n);
+ assertEquals(map.size(), n);
+ }
+
+ @Test
+ public void concurrentInsertions() throws Throwable {
+ ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final int nThreads = 16;
+ final int N = 100_000;
+ String value = "value";
+
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < nThreads; i++) {
+ final int threadIdx = i;
+
+ futures.add(executor.submit(() -> {
+ Random random = new Random();
+
+ for (int j = 0; j < N; j++) {
+ long key = random.nextLong();
+ // Ensure keys are uniques
+ key -= key % (threadIdx + 1);
+
+ map.put(key, value);
+ }
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(map.size(), N * nThreads);
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void concurrentInsertionsAndReads() throws Throwable {
+ ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final int nThreads = 16;
+ final int N = 100_000;
+ String value = "value";
+
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < nThreads; i++) {
+ final int threadIdx = i;
+
+ futures.add(executor.submit(() -> {
+ Random random = new Random();
+
+ for (int j = 0; j < N; j++) {
+ long key = random.nextLong();
+ // Ensure keys are uniques
+ key -= key % (threadIdx + 1);
+
+ map.put(key, value);
+ }
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(map.size(), N * nThreads);
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void testIteration() {
+ ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+
+ assertEquals(map.keys(), Collections.emptyList());
+ assertEquals(map.values(), Collections.emptyList());
+
+ map.put(0, "zero");
+
+ assertEquals(map.keys(), Lists.newArrayList(0l));
+ assertEquals(map.values(), Lists.newArrayList("zero"));
+
+ map.remove(0);
+
+ assertEquals(map.keys(), Collections.emptyList());
+ assertEquals(map.values(), Collections.emptyList());
+
+ map.put(0, "zero");
+ map.put(1, "one");
+ map.put(2, "two");
+
+ List<Long> keys = map.keys();
+ Collections.sort(keys);
+ assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+ List<String> values = map.values();
+ Collections.sort(values);
+ assertEquals(values, Lists.newArrayList("one", "two", "zero"));
+
+ map.put(1, "uno");
+
+ keys = map.keys();
+ Collections.sort(keys);
+ assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+ values = map.values();
+ Collections.sort(values);
+ assertEquals(values, Lists.newArrayList("two", "uno", "zero"));
+
+ map.clear();
+ assertTrue(map.isEmpty());
+ }
+
+ @Test
+ public void testHashConflictWithDeletion() {
+ final int Buckets = 16;
+ ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(Buckets, 1);
+
+ // Pick 2 keys that fall into the same bucket
+ long key1 = 1;
+ long key2 = 27;
+
+ int bucket1 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key1), Buckets);
+ int bucket2 = ConcurrentLongHashMap.signSafeMod(ConcurrentLongHashMap.hash(key2), Buckets);
+ assertEquals(bucket1, bucket2);
+
+ assertEquals(map.put(key1, "value-1"), null);
+ assertEquals(map.put(key2, "value-2"), null);
+ assertEquals(map.size(), 2);
+
+ assertEquals(map.remove(key1), "value-1");
+ assertEquals(map.size(), 1);
+
+ assertEquals(map.put(key1, "value-1-overwrite"), null);
+ assertEquals(map.size(), 2);
+
+ assertEquals(map.remove(key1), "value-1-overwrite");
+ assertEquals(map.size(), 1);
+
+ assertEquals(map.put(key2, "value-2-overwrite"), "value-2");
+ assertEquals(map.get(key2), "value-2-overwrite");
+
+ assertEquals(map.size(), 1);
+ assertEquals(map.remove(key2), "value-2-overwrite");
+ assertTrue(map.isEmpty());
+ }
+
+ @Test
+ public void testPutIfAbsent() {
+ ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
+ assertEquals(map.putIfAbsent(1, "one"), null);
+ assertEquals(map.get(1), "one");
+
+ assertEquals(map.putIfAbsent(1, "uno"), "one");
+ assertEquals(map.get(1), "one");
+ }
+
+ @Test
+ public void testComputeIfAbsent() {
+ ConcurrentLongHashMap<Integer> map = new ConcurrentLongHashMap<>(16, 1);
+ AtomicInteger counter = new AtomicInteger();
+ LongFunction<Integer> provider = new LongFunction<Integer>() {
+ public Integer apply(long key) {
+ return counter.getAndIncrement();
+ }
+ };
+
+ assertEquals(map.computeIfAbsent(0, provider).intValue(), 0);
+ assertEquals(map.get(0).intValue(), 0);
+
+ assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
+ assertEquals(map.get(1).intValue(), 1);
+
+ assertEquals(map.computeIfAbsent(1, provider).intValue(), 1);
+ assertEquals(map.get(1).intValue(), 1);
+
+ assertEquals(map.computeIfAbsent(2, provider).intValue(), 2);
+ assertEquals(map.get(2).intValue(), 2);
+ }
+
+ final static int Iterations = 1;
+ final static int ReadIterations = 100;
+ final static int N = 1_000_000;
+
+ public void benchConcurrentLongHashMap() throws Exception {
+ // public static void main(String args[]) {
+ ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(N, 1);
+
+ for (long i = 0; i < Iterations; i++) {
+ for (int j = 0; j < N; j++) {
+ map.put(i, "value");
+ }
+
+ for (long h = 0; h < ReadIterations; h++) {
+ for (int j = 0; j < N; j++) {
+ map.get(i);
+ }
+ }
+
+ for (int j = 0; j < N; j++) {
+ map.remove(i);
+ }
+ }
+ }
+
+ public void benchConcurrentHashMap() throws Exception {
+ ConcurrentHashMap<Long, String> map = new ConcurrentHashMap<Long, String>(N, 0.66f, 1);
+
+ for (long i = 0; i < Iterations; i++) {
+ for (int j = 0; j < N; j++) {
+ map.put(i, "value");
+ }
+
+ for (long h = 0; h < ReadIterations; h++) {
+ for (int j = 0; j < N; j++) {
+ map.get(i);
+ }
+ }
+
+ for (int j = 0; j < N; j++) {
+ map.remove(i);
+ }
+ }
+ }
+
+ void benchHashMap() throws Exception {
+ HashMap<Long, String> map = new HashMap<Long, String>(N, 0.66f);
+
+ for (long i = 0; i < Iterations; i++) {
+ for (int j = 0; j < N; j++) {
+ map.put(i, "value");
+ }
+
+ for (long h = 0; h < ReadIterations; h++) {
+ for (int j = 0; j < N; j++) {
+ map.get(i);
+ }
+ }
+
+ for (int j = 0; j < N; j++) {
+ map.remove(i);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ ConcurrentLongHashMapTest t = new ConcurrentLongHashMapTest();
+
+ long start = System.nanoTime();
+ // t.benchHashMap();
+ long end = System.nanoTime();
+
+ System.out.println("HM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+ start = System.nanoTime();
+ t.benchConcurrentHashMap();
+ end = System.nanoTime();
+
+ System.out.println("CHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+ start = System.nanoTime();
+ // t.benchConcurrentLongHashMap();
+ end = System.nanoTime();
+
+ System.out.println("CLHM: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
new file mode 100644
index 0000000..5a8d904
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
@@ -0,0 +1,275 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ConcurrentLongHashSetTest {
+
+ @Test
+ public void testConstructor() {
+ try {
+ new ConcurrentLongHashSet(0);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ new ConcurrentLongHashSet(16, 0);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ new ConcurrentLongHashSet(4, 8);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+ }
+
+ @Test
+ public void simpleInsertions() {
+ ConcurrentLongHashSet set = new ConcurrentLongHashSet(16);
+
+ assertTrue(set.isEmpty());
+ assertTrue(set.add(1));
+ assertFalse(set.isEmpty());
+
+ assertTrue(set.add(2));
+ assertTrue(set.add(3));
+
+ assertEquals(set.size(), 3);
+
+ assertTrue(set.contains(1));
+ assertEquals(set.size(), 3);
+
+ assertTrue(set.remove(1));
+ assertEquals(set.size(), 2);
+ assertFalse(set.contains(1));
+ assertFalse(set.contains(5));
+ assertEquals(set.size(), 2);
+
+ assertTrue(set.add(1));
+ assertEquals(set.size(), 3);
+ assertFalse(set.add(1));
+ assertEquals(set.size(), 3);
+ }
+
+ @Test
+ public void testRemove() {
+ ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+
+ assertTrue(set.isEmpty());
+ assertTrue(set.add(1));
+ assertFalse(set.isEmpty());
+
+ assertFalse(set.remove(0));
+ assertFalse(set.isEmpty());
+ assertTrue(set.remove(1));
+ assertTrue(set.isEmpty());
+ }
+
+ @Test
+ public void testRehashing() {
+ int n = 16;
+ ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+ assertEquals(set.capacity(), n);
+ assertEquals(set.size(), 0);
+
+ for (int i = 0; i < n; i++) {
+ set.add(i);
+ }
+
+ assertEquals(set.capacity(), 2 * n);
+ assertEquals(set.size(), n);
+ }
+
+ @Test
+ public void testRehashingWithDeletes() {
+ int n = 16;
+ ConcurrentLongHashSet set = new ConcurrentLongHashSet(n / 2, 1);
+ assertEquals(set.capacity(), n);
+ assertEquals(set.size(), 0);
+
+ for (int i = 0; i < n / 2; i++) {
+ set.add(i);
+ }
+
+ for (int i = 0; i < n / 2; i++) {
+ set.remove(i);
+ }
+
+ for (int i = n; i < (2 * n); i++) {
+ set.add(i);
+ }
+
+ assertEquals(set.capacity(), 2 * n);
+ assertEquals(set.size(), n);
+ }
+
+ @Test
+ public void concurrentInsertions() throws Throwable {
+ ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final int nThreads = 16;
+ final int N = 100_000;
+
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < nThreads; i++) {
+ final int threadIdx = i;
+
+ futures.add(executor.submit(() -> {
+ Random random = new Random();
+
+ for (int j = 0; j < N; j++) {
+ long key = Math.abs(random.nextLong());
+ // Ensure keys are unique
+ key -= key % (threadIdx + 1);
+
+ set.add(key);
+ }
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(set.size(), N * nThreads);
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void concurrentInsertionsAndReads() throws Throwable {
+ ConcurrentLongHashSet map = new ConcurrentLongHashSet();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final int nThreads = 16;
+ final int N = 100_000;
+
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < nThreads; i++) {
+ final int threadIdx = i;
+
+ futures.add(executor.submit(() -> {
+ Random random = new Random();
+
+ for (int j = 0; j < N; j++) {
+ long key = Math.abs(random.nextLong());
+ // Ensure keys are unique
+ key -= key % (threadIdx + 1);
+
+ map.add(key);
+ }
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(map.size(), N * nThreads);
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void testIteration() {
+ ConcurrentLongHashSet set = new ConcurrentLongHashSet();
+
+ assertEquals(set.items(), Collections.emptySet());
+
+ set.add(0l);
+
+ assertEquals(set.items(), Sets.newHashSet(0l));
+
+ set.remove(0l);
+
+ assertEquals(set.items(), Collections.emptySet());
+
+ set.add(0l);
+ set.add(1l);
+ set.add(2l);
+
+ List<Long> values = Lists.newArrayList(set.items());
+ Collections.sort(values);
+ assertEquals(values, Lists.newArrayList(0l, 1l, 2l));
+
+ set.clear();
+ assertTrue(set.isEmpty());
+ }
+
+ @Test
+ public void testHashConflictWithDeletion() {
+ final int Buckets = 16;
+ ConcurrentLongHashSet set = new ConcurrentLongHashSet(Buckets, 1);
+
+ // Pick 2 keys that fall into the same bucket
+ long key1 = 1;
+ long key2 = 27;
+
+ int bucket1 = ConcurrentOpenHashSet.signSafeMod(ConcurrentOpenHashSet.hash(key1), Buckets);
+ int bucket2 = ConcurrentOpenHashSet.signSafeMod(ConcurrentOpenHashSet.hash(key2), Buckets);
+ assertEquals(bucket1, bucket2);
+
+ assertTrue(set.add(key1));
+ assertTrue(set.add(key2));
+ assertEquals(set.size(), 2);
+
+ assertTrue(set.remove(key1));
+ assertEquals(set.size(), 1);
+
+ assertTrue(set.add(key1));
+ assertEquals(set.size(), 2);
+
+ assertTrue(set.remove(key1));
+ assertEquals(set.size(), 1);
+
+ assertFalse(set.add(key2));
+ assertTrue(set.contains(key2));
+
+ assertEquals(set.size(), 1);
+ assertTrue(set.remove(key2));
+ assertTrue(set.isEmpty());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
new file mode 100644
index 0000000..a7492a1
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
@@ -0,0 +1,473 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.LongLongFunction;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class ConcurrentLongLongHashMapTest {
+
+ @Test
+ public void testConstructor() {
+ try {
+ new ConcurrentLongLongHashMap(0);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ new ConcurrentLongLongHashMap(16, 0);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ new ConcurrentLongLongHashMap(4, 8);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+ }
+
+ @Test
+ public void simpleInsertions() {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16);
+
+ assertTrue(map.isEmpty());
+ assertEquals(map.put(1, 11), -1);
+ assertFalse(map.isEmpty());
+
+ assertEquals(map.put(2, 22), -1);
+ assertEquals(map.put(3, 33), -1);
+
+ assertEquals(map.size(), 3);
+
+ assertEquals(map.get(1), 11);
+ assertEquals(map.size(), 3);
+
+ assertEquals(map.remove(1), 11);
+ assertEquals(map.size(), 2);
+ assertEquals(map.get(1), -1);
+ assertEquals(map.get(5), -1);
+ assertEquals(map.size(), 2);
+
+ assertEquals(map.put(1, 11), -1);
+ assertEquals(map.size(), 3);
+ assertEquals(map.put(1, 111), 11);
+ assertEquals(map.size(), 3);
+ }
+
+ @Test
+ public void testRemove() {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+
+ assertTrue(map.isEmpty());
+ assertEquals(map.put(1, 11), -1);
+ assertFalse(map.isEmpty());
+
+ assertFalse(map.remove(0, 0));
+ assertFalse(map.remove(1, 111));
+
+ assertFalse(map.isEmpty());
+ assertTrue(map.remove(1, 11));
+ assertTrue(map.isEmpty());
+ }
+
+ @Test
+ public void testNegativeUsedBucketCount() {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+ map.put(0, 0);
+ assertEquals(1, map.getUsedBucketCount());
+ map.put(0, 1);
+ assertEquals(1, map.getUsedBucketCount());
+ map.remove(0);
+ assertEquals(0, map.getUsedBucketCount());
+ map.remove(0);
+ assertEquals(0, map.getUsedBucketCount());
+ }
+
+ @Test
+ public void testRehashing() {
+ int n = 16;
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(n / 2, 1);
+ assertEquals(map.capacity(), n);
+ assertEquals(map.size(), 0);
+
+ for (int i = 0; i < n; i++) {
+ map.put(i, i);
+ }
+
+ assertEquals(map.capacity(), 2 * n);
+ assertEquals(map.size(), n);
+ }
+
+ @Test
+ public void testRehashingWithDeletes() {
+ int n = 16;
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(n / 2, 1);
+ assertEquals(map.capacity(), n);
+ assertEquals(map.size(), 0);
+
+ for (int i = 0; i < n / 2; i++) {
+ map.put(i, i);
+ }
+
+ for (int i = 0; i < n / 2; i++) {
+ map.remove(i);
+ }
+
+ for (int i = n; i < (2 * n); i++) {
+ map.put(i, i);
+ }
+
+ assertEquals(map.capacity(), 2 * n);
+ assertEquals(map.size(), n);
+ }
+
+ @Test
+ public void concurrentInsertions() throws Throwable {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final int nThreads = 16;
+ final int N = 100_000;
+ long value = 55;
+
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < nThreads; i++) {
+ final int threadIdx = i;
+
+ futures.add(executor.submit(() -> {
+ Random random = new Random();
+
+ for (int j = 0; j < N; j++) {
+ long key = Math.abs(random.nextLong());
+ // Ensure keys are uniques
+ key -= key % (threadIdx + 1);
+
+ map.put(key, value);
+ }
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(map.size(), N * nThreads);
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void concurrentInsertionsAndReads() throws Throwable {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final int nThreads = 16;
+ final int N = 100_000;
+ final long value = 55;
+
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < nThreads; i++) {
+ final int threadIdx = i;
+
+ futures.add(executor.submit(() -> {
+ Random random = new Random();
+
+ for (int j = 0; j < N; j++) {
+ long key = Math.abs(random.nextLong());
+ // Ensure keys are uniques
+ key -= key % (threadIdx + 1);
+
+ map.put(key, value);
+ }
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(map.size(), N * nThreads);
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void testIteration() {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+
+ assertEquals(map.keys(), Collections.emptyList());
+ assertEquals(map.values(), Collections.emptyList());
+
+ map.put(0, 0);
+
+ assertEquals(map.keys(), Lists.newArrayList(0l));
+ assertEquals(map.values(), Lists.newArrayList(0l));
+
+ map.remove(0);
+
+ assertEquals(map.keys(), Collections.emptyList());
+ assertEquals(map.values(), Collections.emptyList());
+
+ map.put(0, 0);
+ map.put(1, 11);
+ map.put(2, 22);
+
+ List<Long> keys = map.keys();
+ Collections.sort(keys);
+ assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+ List<Long> values = map.values();
+ Collections.sort(values);
+ assertEquals(values, Lists.newArrayList(0l, 11l, 22l));
+
+ map.put(1, 111);
+
+ keys = map.keys();
+ Collections.sort(keys);
+ assertEquals(keys, Lists.newArrayList(0l, 1l, 2l));
+
+ values = map.values();
+ Collections.sort(values);
+ assertEquals(values, Lists.newArrayList(0l, 22l, 111l));
+
+ map.clear();
+ assertTrue(map.isEmpty());
+ }
+
+ @Test
+ public void testHashConflictWithDeletion() {
+ final int Buckets = 16;
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(Buckets, 1);
+
+ // Pick 2 keys that fall into the same bucket
+ long key1 = 1;
+ long key2 = 27;
+
+ int bucket1 = ConcurrentLongLongHashMap.signSafeMod(ConcurrentLongLongHashMap.hash(key1), Buckets);
+ int bucket2 = ConcurrentLongLongHashMap.signSafeMod(ConcurrentLongLongHashMap.hash(key2), Buckets);
+ assertEquals(bucket1, bucket2);
+
+ final long value1 = 1;
+ final long value2 = 2;
+ final long value1Overwrite = 3;
+ final long value2Overwrite = 3;
+
+ assertEquals(map.put(key1, value1), -1);
+ assertEquals(map.put(key2, value2), -1);
+ assertEquals(map.size(), 2);
+
+ assertEquals(map.remove(key1), value1);
+ assertEquals(map.size(), 1);
+
+ assertEquals(map.put(key1, value1Overwrite), -1);
+ assertEquals(map.size(), 2);
+
+ assertEquals(map.remove(key1), value1Overwrite);
+ assertEquals(map.size(), 1);
+
+ assertEquals(map.put(key2, value2Overwrite), value2);
+ assertEquals(map.get(key2), value2Overwrite);
+
+ assertEquals(map.size(), 1);
+ assertEquals(map.remove(key2), value2Overwrite);
+ assertTrue(map.isEmpty());
+ }
+
+ @Test
+ public void testPutIfAbsent() {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap();
+ assertEquals(map.putIfAbsent(1, 11), -1);
+ assertEquals(map.get(1), 11);
+
+ assertEquals(map.putIfAbsent(1, 111), 11);
+ assertEquals(map.get(1), 11);
+ }
+
+ @Test
+ public void testComputeIfAbsent() {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+ AtomicLong counter = new AtomicLong();
+ LongLongFunction provider = new LongLongFunction() {
+ public long apply(long key) {
+ return counter.getAndIncrement();
+ }
+ };
+
+ assertEquals(map.computeIfAbsent(0, provider), 0);
+ assertEquals(map.get(0), 0);
+
+ assertEquals(map.computeIfAbsent(1, provider), 1);
+ assertEquals(map.get(1), 1);
+
+ assertEquals(map.computeIfAbsent(1, provider), 1);
+ assertEquals(map.get(1), 1);
+
+ assertEquals(map.computeIfAbsent(2, provider), 2);
+ assertEquals(map.get(2), 2);
+ }
+
+ @Test
+ public void testAddAndGet() {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+ assertEquals(map.addAndGet(0, 0), 0);
+ assertEquals(map.containsKey(0), true);
+ assertEquals(map.get(0), 0);
+
+ assertEquals(map.containsKey(5), false);
+
+ assertEquals(map.addAndGet(0, 5), 5);
+ assertEquals(map.get(0), 5);
+
+ assertEquals(map.addAndGet(0, 1), 6);
+ assertEquals(map.get(0), 6);
+
+ assertEquals(map.addAndGet(0, -2), 4);
+ assertEquals(map.get(0), 4);
+
+ // Cannot bring to value to negative
+ try {
+ map.addAndGet(0, -5);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+ assertEquals(map.get(0), 4);
+ }
+
+ @Test
+ public void testRemoveIf() {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+ map.put(1L, 1L);
+ map.put(2L, 2L);
+ map.put(3L, 3L);
+ map.put(4L, 4L);
+
+ map.removeIf(key -> key < 3);
+ assertFalse(map.containsKey(1L));
+ assertFalse(map.containsKey(2L));
+ assertTrue(map.containsKey(3L));
+ assertTrue(map.containsKey(4L));
+ assertEquals(2, map.size());
+ }
+
+ @Test
+ public void testRemoveIfValue() {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+ map.put(1L, 1L);
+ map.put(2L, 2L);
+ map.put(3L, 1L);
+ map.put(4L, 2L);
+
+ map.removeIf((key, value) -> value < 2);
+ assertFalse(map.containsKey(1L));
+ assertTrue(map.containsKey(2L));
+ assertFalse(map.containsKey(3L));
+ assertTrue(map.containsKey(4L));
+ assertEquals(2, map.size());
+ }
+
+ @Test
+ public void testIvalidKeys() {
+ ConcurrentLongLongHashMap map = new ConcurrentLongLongHashMap(16, 1);
+
+ try {
+ map.put(-5, 4);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ map.get(-1);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ map.containsKey(-1);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ map.putIfAbsent(-1, 1);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ map.computeIfAbsent(-1, new LongLongFunction() {
+ @Override
+ public long apply(long key) {
+ return 1;
+ }
+ });
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+ }
+
+ @Test
+ public void testAsMap() {
+ ConcurrentLongLongHashMap lmap = new ConcurrentLongLongHashMap(16, 1);
+ lmap.put(1, 11);
+ lmap.put(2, 22);
+ lmap.put(3, 33);
+
+ Map<Long, Long> map = Maps.newTreeMap();
+ map.put(1l, 11l);
+ map.put(2l, 22l);
+ map.put(3l, 33l);
+
+ assertEquals(map, lmap.asMap());
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/ecbb053e/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
new file mode 100644
index 0000000..23aa327
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class ConcurrentLongLongPairHashMapTest {
+
+ @Test
+ public void testConstructor() {
+ try {
+ new ConcurrentLongLongPairHashMap(0);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ new ConcurrentLongLongPairHashMap(16, 0);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ new ConcurrentLongLongPairHashMap(4, 8);
+ fail("should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+ }
+
+ @Test
+ public void simpleInsertions() {
+ ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(16);
+
+ assertTrue(map.isEmpty());
+ assertTrue(map.put(1, 1, 11, 11));
+ assertFalse(map.isEmpty());
+
+ assertTrue(map.put(2, 2, 22, 22));
+ assertTrue(map.put(3, 3, 33, 33));
+
+ assertEquals(map.size(), 3);
+
+ assertEquals(map.get(1, 1), new LongPair(11, 11));
+ assertEquals(map.size(), 3);
+
+ assertTrue(map.remove(1, 1));
+ assertEquals(map.size(), 2);
+ assertEquals(map.get(1, 1), null);
+ assertEquals(map.get(5, 5), null);
+ assertEquals(map.size(), 2);
+
+ assertTrue(map.put(1, 1, 11, 11));
+ assertEquals(map.size(), 3);
+ assertTrue(map.put(1, 1, 111, 111));
+ assertEquals(map.size(), 3);
+ }
+
+ @Test
+ public void testRemove() {
+ ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+
+ assertTrue(map.isEmpty());
+ assertTrue(map.put(1, 1, 11, 11));
+ assertFalse(map.isEmpty());
+
+ assertFalse(map.remove(0, 0));
+ assertFalse(map.remove(1, 1, 111, 111));
+
+ assertFalse(map.isEmpty());
+ assertTrue(map.remove(1, 1, 11, 11));
+ assertTrue(map.isEmpty());
+ }
+
+ @Test
+ public void testNegativeUsedBucketCount() {
+ ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(16, 1);
+
+ map.put(0, 0, 0, 0);
+ assertEquals(1, map.getUsedBucketCount());
+ map.put(0, 0, 1, 1);
+ assertEquals(1, map.getUsedBucketCount());
+ map.remove(0, 0);
+ assertEquals(0, map.getUsedBucketCount());
+ map.remove(0, 0);
+ assertEquals(0, map.getUsedBucketCount());
+ }
+
+ @Test
+ public void testRehashing() {
+ int n = 16;
+ ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(n / 2, 1);
+ assertEquals(map.capacity(), n);
+ assertEquals(map.size(), 0);
+
+ for (int i = 0; i < n; i++) {
+ map.put(i, i, i, i);
+ }
+
+ assertEquals(map.capacity(), 2 * n);
+ assertEquals(map.size(), n);
+ }
+
+ @Test
+ public void testRehashingWithDeletes() {
+ int n = 16;
+ ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(n / 2, 1);
+ assertEquals(map.capacity(), n);
+ assertEquals(map.size(), 0);
+
+ for (int i = 0; i < n / 2; i++) {
+ map.put(i, i, i, i);
+ }
+
+ for (int i = 0; i < n / 2; i++) {
+ map.remove(i, i);
+ }
+
+ for (int i = n; i < (2 * n); i++) {
+ map.put(i, i, i, i);
+ }
+
+ assertEquals(map.capacity(), 2 * n);
+ assertEquals(map.size(), n);
+ }
+
+ @Test
+ public void concurrentInsertions() throws Throwable {
+ ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final int nThreads = 16;
+ final int N = 100_000;
+ long value = 55;
+
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < nThreads; i++) {
+ final int threadIdx = i;
+
+ futures.add(executor.submit(() -> {
+ Random random = new Random();
+
+ for (int j = 0; j < N; j++) {
+ long key1 = Math.abs(random.nextLong());
+ // Ensure keys are uniques
+ key1 -= key1 % (threadIdx + 1);
+
+ long key2 = Math.abs(random.nextLong());
+ // Ensure keys are uniques
+ key2 -= key2 % (threadIdx + 1);
+
+ map.put(key1, key2, value, value);
+ }
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(map.size(), N * nThreads);
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void concurrentInsertionsAndReads() throws Throwable {
+ ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final int nThreads = 16;
+ final int N = 100_000;
+ final long value = 55;
+
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < nThreads; i++) {
+ final int threadIdx = i;
+
+ futures.add(executor.submit(() -> {
+ Random random = new Random();
+
+ for (int j = 0; j < N; j++) {
+ long key1 = Math.abs(random.nextLong());
+ // Ensure keys are uniques
+ key1 -= key1 % (threadIdx + 1);
+
+ long key2 = Math.abs(random.nextLong());
+ // Ensure keys are uniques
+ key2 -= key2 % (threadIdx + 1);
+
+ map.put(key1, key2, value, value);
+ }
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(map.size(), N * nThreads);
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void testIteration() {
+ ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+
+ assertEquals(map.keys(), Collections.emptyList());
+ assertEquals(map.values(), Collections.emptyList());
+
+ map.put(0, 0, 0, 0);
+
+ assertEquals(map.keys(), Lists.newArrayList(new LongPair(0, 0)));
+ assertEquals(map.values(), Lists.newArrayList(new LongPair(0, 0)));
+
+ map.remove(0, 0);
+
+ assertEquals(map.keys(), Collections.emptyList());
+ assertEquals(map.values(), Collections.emptyList());
+
+ map.put(0, 0, 0, 0);
+ map.put(1, 1, 11, 11);
+ map.put(2, 2, 22, 22);
+
+ List<LongPair> keys = map.keys();
+ Collections.sort(keys);
+ assertEquals(keys, Lists.newArrayList(new LongPair(0, 0), new LongPair(1, 1), new LongPair(2, 2)));
+
+ List<LongPair> values = map.values();
+ Collections.sort(values);
+ assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new LongPair(11, 11), new LongPair(22, 22)));
+
+ map.put(1, 1, 111, 111);
+
+ keys = map.keys();
+ Collections.sort(keys);
+ assertEquals(keys, Lists.newArrayList(new LongPair(0, 0), new LongPair(1, 1), new LongPair(2, 2)));
+
+ values = map.values();
+ Collections.sort(values);
+ assertEquals(values, Lists.newArrayList(new LongPair(0, 0), new LongPair(22, 22), new LongPair(111, 111)));
+
+ map.clear();
+ assertTrue(map.isEmpty());
+ }
+
+ @Test
+ public void testPutIfAbsent() {
+ ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap();
+ assertTrue(map.putIfAbsent(1, 1, 11, 11));
+ assertEquals(map.get(1, 1), new LongPair(11, 11));
+
+ assertFalse(map.putIfAbsent(1, 1, 111, 111));
+ assertEquals(map.get(1, 1), new LongPair(11, 11));
+ }
+
+ @Test
+ public void testIvalidKeys() {
+ ConcurrentLongLongPairHashMap map = new ConcurrentLongLongPairHashMap(16, 1);
+
+ try {
+ map.put(-5, 3, 4, 4);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ map.get(-1, 0);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ map.containsKey(-1, 0);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+
+ try {
+ map.putIfAbsent(-1, 1, 1, 1);
+ fail("should have failed");
+ } catch (IllegalArgumentException e) {
+ // ok
+ }
+ }
+
+ @Test
+ public void testAsMap() {
+ ConcurrentLongLongPairHashMap lmap = new ConcurrentLongLongPairHashMap(16, 1);
+ lmap.put(1, 1, 11, 11);
+ lmap.put(2, 2, 22, 22);
+ lmap.put(3, 3, 33, 33);
+
+ Map<LongPair, LongPair> map = Maps.newTreeMap();
+ map.put(new LongPair(1, 1), new LongPair(11, 11));
+ map.put(new LongPair(2, 2), new LongPair(22, 22));
+ map.put(new LongPair(3, 3), new LongPair(33, 33));
+
+ assertEquals(map, lmap.asMap());
+ }
+}