You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2018/08/16 16:54:08 UTC
[bookkeeper] branch master updated: ISSUE #1606: Fixed race
condition during expansion of concurrent open hash maps
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a7e66e1 ISSUE #1606: Fixed race condition during expansion of concurrent open hash maps
a7e66e1 is described below
commit a7e66e1dd84ff17c918dbbf54e5baa18ff9c9c9b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Aug 16 09:53:57 2018 -0700
ISSUE #1606: Fixed race condition during expansion of concurrent open hash maps
### Motivation
As reported in #1606, there is a race condition in the concurrent open hash maps implementation. The race happens when the maps gets re-hashed after the expansion and the new arrays are substituting the old ones.
The race itself is that a thread doing a `get()` on the map is first checking the current `capacity` of the map, uses that to get the bucket and then tries to do optimistic read of the value in that bucket.
This assumes `capacity` update is visible only after the `values` array is already swapped, but that is not always the case in current code.
### Changes
* Use `volatile` qualifier for `capacity` and `values` arrays to ensure ordering of memory read is respected by compiler
* In rehashing, update `capacity` after `values`
Author: Matteo Merli <mm...@apache.org>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #1607 from merlimat/fix-concurrent-maps, closes #1606
---
.../util/collections/ConcurrentLongHashMap.java | 10 ++--
.../util/collections/ConcurrentLongHashSet.java | 8 ++-
.../collections/ConcurrentLongLongHashMap.java | 8 ++-
.../collections/ConcurrentLongLongPairHashMap.java | 8 ++-
.../util/collections/ConcurrentOpenHashMap.java | 8 ++-
.../util/collections/ConcurrentOpenHashSet.java | 8 ++-
.../collections/ConcurrentLongHashMapTest.java | 68 ++++++++++++++++++++++
7 files changed, 99 insertions(+), 19 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
index dff0af4..d98d3a7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -215,10 +215,10 @@ public class ConcurrentLongHashMap<V> {
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section<V> extends StampedLock {
- private long[] keys;
- private V[] values;
+ private volatile long[] keys;
+ private volatile V[] values;
- private int capacity;
+ private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
@@ -507,10 +507,12 @@ public class ConcurrentLongHashMap<V> {
}
}
- capacity = newCapacity;
keys = newKeys;
values = newValues;
usedBuckets = size;
+ // Capacity needs to be updated after the values, so that we won't see
+ // a capacity value bigger than the actual array size
+ capacity = newCapacity;
resizeThreshold = (int) (capacity * MapFillFactor);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
index fbf374e..b0edaad 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
@@ -166,9 +166,9 @@ public class ConcurrentLongHashSet {
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
// Keys and values are stored interleaved in the table array
- private long[] table;
+ private volatile long[] table;
- private int capacity;
+ private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
@@ -375,9 +375,11 @@ public class ConcurrentLongHashSet {
}
}
- capacity = newCapacity;
table = newTable;
usedBuckets = size;
+ // Capacity needs to be updated after the values, so that we won't see
+ // a capacity value bigger than the actual array size
+ capacity = newCapacity;
resizeThreshold = (int) (capacity * SetFillFactor);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
index ecc890b..f3e3d56 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
@@ -285,9 +285,9 @@ public class ConcurrentLongLongHashMap {
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
// Keys and values are stored interleaved in the table array
- private long[] table;
+ private volatile long[] table;
- private int capacity;
+ private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
@@ -682,9 +682,11 @@ public class ConcurrentLongLongHashMap {
}
}
- capacity = newCapacity;
table = newTable;
usedBuckets = size;
+ // Capacity needs to be updated after the values, so that we won't see
+ // a capacity value bigger than the actual array size
+ capacity = newCapacity;
resizeThreshold = (int) (capacity * MapFillFactor);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
index 146b904..42cb04b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
@@ -223,9 +223,9 @@ public class ConcurrentLongLongPairHashMap {
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
// Keys and values are stored interleaved in the table array
- private long[] table;
+ private volatile long[] table;
- private int capacity;
+ private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
@@ -470,9 +470,11 @@ public class ConcurrentLongLongPairHashMap {
}
}
- capacity = newCapacity;
table = newTable;
usedBuckets = size;
+ // Capacity needs to be updated after the values, so that we won't see
+ // a capacity value bigger than the actual array size
+ capacity = newCapacity;
resizeThreshold = (int) (capacity * MapFillFactor);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
index 77f3c11..475b70e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
@@ -194,9 +194,9 @@ public class ConcurrentOpenHashMap<K, V> {
@SuppressWarnings("serial")
private static final class Section<K, V> extends StampedLock {
// Keys and values are stored interleaved in the table array
- private Object[] table;
+ private volatile Object[] table;
- private int capacity;
+ private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
@@ -449,9 +449,11 @@ public class ConcurrentOpenHashMap<K, V> {
}
}
- capacity = newCapacity;
table = newTable;
usedBuckets = size;
+ // Capacity needs to be updated after the values, so that we won't see
+ // a capacity value bigger than the actual array size
+ capacity = newCapacity;
resizeThreshold = (int) (capacity * MapFillFactor);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
index 3420eda..9cf6d22 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
@@ -148,9 +148,9 @@ public class ConcurrentOpenHashSet<V> {
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section<V> extends StampedLock {
- private V[] values;
+ private volatile V[] values;
- private int capacity;
+ private volatile int capacity;
private volatile int size;
private int usedBuckets;
private int resizeThreshold;
@@ -371,9 +371,11 @@ public class ConcurrentOpenHashSet<V> {
}
}
- capacity = newCapacity;
values = newValues;
usedBuckets = size;
+ // Capacity needs to be updated after the values, so that we won't see
+ // a capacity value bigger than the actual array size
+ capacity = newCapacity;
resizeThreshold = (int) (capacity * MapFillFactor);
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index 06c7667..cec38cd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -256,6 +257,73 @@ public class ConcurrentLongHashMapTest {
}
@Test
+ public void stressConcurrentInsertionsAndReads() throws Throwable {
+ ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final int writeThreads = 16;
+ final int readThreads = 16;
+ final int n = 1_000_000;
+ String value = "value";
+
+ CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+ List<Future<?>> futures = new ArrayList<>();
+
+
+ for (int i = 0; i < writeThreads; i++) {
+ final int threadIdx = i;
+
+ futures.add(executor.submit(() -> {
+ Random random = new Random(threadIdx);
+
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ for (int j = 0; j < n; j++) {
+ long key = random.nextLong();
+ // Ensure keys are uniques
+ key -= key % (threadIdx + 1);
+
+ map.put(key, value);
+ }
+ }));
+ }
+
+ for (int i = 0; i < readThreads; i++) {
+ final int threadIdx = i;
+
+ futures.add(executor.submit(() -> {
+ Random random = new Random(threadIdx);
+
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ for (int j = 0; j < n; j++) {
+ long key = random.nextLong();
+ // Ensure keys are uniques
+ key -= key % (threadIdx + 1);
+
+ map.get(key);
+ }
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(map.size(), n * writeThreads);
+
+ executor.shutdown();
+ }
+
+ @Test
public void testIteration() {
ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();