You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2018/08/16 16:54:58 UTC

[bookkeeper] branch branch-4.8 updated: ISSUE #1606: Fixed race condition during expansion of concurrent open hash maps

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-4.8
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.8 by this push:
     new 07bb42a  ISSUE #1606: Fixed race condition during expansion of concurrent open hash maps
07bb42a is described below

commit 07bb42add176423e83b7db5777ac5f67a563d9ea
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Aug 16 09:53:57 2018 -0700

    ISSUE #1606: Fixed race condition during expansion of concurrent open hash maps
    
    ### Motivation
    
    As reported in #1606, there is a race condition in the concurrent open hash maps implementation. The race happens when the maps gets re-hashed after the expansion and the new arrays are substituting the old ones.
    
    The race itself is that a thread doing a `get()` on the map is first checking the current `capacity` of the map, uses that to get the bucket and then tries to do optimistic read of the value in that bucket.
    
    This assumes `capacity` update is visible only after the `values` array is already swapped, but that is not always the case in current code.
    
    ### Changes
     * Use `volatile` qualifier for `capacity` and `values` arrays to ensure ordering of memory read is respected by compiler
     * In rehashing, update `capacity` after `values`
    
    Author: Matteo Merli <mm...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1607 from merlimat/fix-concurrent-maps, closes #1606
    
    (cherry picked from commit a7e66e1dd84ff17c918dbbf54e5baa18ff9c9c9b)
    Signed-off-by: Matteo Merli <mm...@apache.org>
---
 .../util/collections/ConcurrentLongHashMap.java    | 10 ++--
 .../util/collections/ConcurrentLongHashSet.java    |  8 ++-
 .../collections/ConcurrentLongLongHashMap.java     |  8 ++-
 .../collections/ConcurrentLongLongPairHashMap.java |  8 ++-
 .../util/collections/ConcurrentOpenHashMap.java    |  8 ++-
 .../util/collections/ConcurrentOpenHashSet.java    |  8 ++-
 .../collections/ConcurrentLongHashMapTest.java     | 68 ++++++++++++++++++++++
 7 files changed, 99 insertions(+), 19 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
index dff0af4..d98d3a7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -215,10 +215,10 @@ public class ConcurrentLongHashMap<V> {
     // A section is a portion of the hash map that is covered by a single
     @SuppressWarnings("serial")
     private static final class Section<V> extends StampedLock {
-        private long[] keys;
-        private V[] values;
+        private volatile long[] keys;
+        private volatile V[] values;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -507,10 +507,12 @@ public class ConcurrentLongHashMap<V> {
                 }
             }
 
-            capacity = newCapacity;
             keys = newKeys;
             values = newValues;
             usedBuckets = size;
+            // Capacity needs to be updated after the values, so that we won't see
+            // a capacity value bigger than the actual array size
+            capacity = newCapacity;
             resizeThreshold = (int) (capacity * MapFillFactor);
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
index fbf374e..b0edaad 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
@@ -166,9 +166,9 @@ public class ConcurrentLongHashSet {
     @SuppressWarnings("serial")
     private static final class Section extends StampedLock {
         // Keys and values are stored interleaved in the table array
-        private long[] table;
+        private volatile long[] table;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -375,9 +375,11 @@ public class ConcurrentLongHashSet {
                 }
             }
 
-            capacity = newCapacity;
             table = newTable;
             usedBuckets = size;
+            // Capacity needs to be updated after the values, so that we won't see
+            // a capacity value bigger than the actual array size
+            capacity = newCapacity;
             resizeThreshold = (int) (capacity * SetFillFactor);
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
index ecc890b..f3e3d56 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
@@ -285,9 +285,9 @@ public class ConcurrentLongLongHashMap {
     @SuppressWarnings("serial")
     private static final class Section extends StampedLock {
         // Keys and values are stored interleaved in the table array
-        private long[] table;
+        private volatile long[] table;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -682,9 +682,11 @@ public class ConcurrentLongLongHashMap {
                 }
             }
 
-            capacity = newCapacity;
             table = newTable;
             usedBuckets = size;
+            // Capacity needs to be updated after the values, so that we won't see
+            // a capacity value bigger than the actual array size
+            capacity = newCapacity;
             resizeThreshold = (int) (capacity * MapFillFactor);
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
index 146b904..42cb04b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
@@ -223,9 +223,9 @@ public class ConcurrentLongLongPairHashMap {
     @SuppressWarnings("serial")
     private static final class Section extends StampedLock {
         // Keys and values are stored interleaved in the table array
-        private long[] table;
+        private volatile long[] table;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -470,9 +470,11 @@ public class ConcurrentLongLongPairHashMap {
                 }
             }
 
-            capacity = newCapacity;
             table = newTable;
             usedBuckets = size;
+            // Capacity needs to be updated after the values, so that we won't see
+            // a capacity value bigger than the actual array size
+            capacity = newCapacity;
             resizeThreshold = (int) (capacity * MapFillFactor);
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
index 77f3c11..475b70e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
@@ -194,9 +194,9 @@ public class ConcurrentOpenHashMap<K, V> {
     @SuppressWarnings("serial")
     private static final class Section<K, V> extends StampedLock {
         // Keys and values are stored interleaved in the table array
-        private Object[] table;
+        private volatile Object[] table;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -449,9 +449,11 @@ public class ConcurrentOpenHashMap<K, V> {
                 }
             }
 
-            capacity = newCapacity;
             table = newTable;
             usedBuckets = size;
+            // Capacity needs to be updated after the values, so that we won't see
+            // a capacity value bigger than the actual array size
+            capacity = newCapacity;
             resizeThreshold = (int) (capacity * MapFillFactor);
         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
index 3420eda..9cf6d22 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
@@ -148,9 +148,9 @@ public class ConcurrentOpenHashSet<V> {
     // A section is a portion of the hash map that is covered by a single
     @SuppressWarnings("serial")
     private static final class Section<V> extends StampedLock {
-        private V[] values;
+        private volatile V[] values;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -371,9 +371,11 @@ public class ConcurrentOpenHashSet<V> {
                 }
             }
 
-            capacity = newCapacity;
             values = newValues;
             usedBuckets = size;
+            // Capacity needs to be updated after the values, so that we won't see
+            // a capacity value bigger than the actual array size
+            capacity = newCapacity;
             resizeThreshold = (int) (capacity * MapFillFactor);
         }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index 06c7667..cec38cd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -256,6 +257,73 @@ public class ConcurrentLongHashMapTest {
     }
 
     @Test
+    public void stressConcurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int writeThreads = 16;
+        final int readThreads = 16;
+        final int n = 1_000_000;
+        String value = "value";
+
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        List<Future<?>> futures = new ArrayList<>();
+
+
+        for (int i = 0; i < writeThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.put(key, value);
+                }
+            }));
+        }
+
+        for (int i = 0; i < readThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.get(key);
+                }
+            }));
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.size(), n * writeThreads);
+
+        executor.shutdown();
+    }
+
+    @Test
     public void testIteration() {
         ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();