You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2014/12/10 10:00:55 UTC

svn commit: r1644352 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java

Author: thomasm
Date: Wed Dec 10 09:00:55 2014
New Revision: 1644352

URL: http://svn.apache.org/r1644352
Log:
OAK-2332 LIRS cache: deadlock if a value loader access the cache

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java?rev=1644352&r1=1644351&r2=1644352&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java Wed Dec 10 09:00:55 2014
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nullable;
 
@@ -689,6 +690,11 @@ public class CacheLIRS<K, V> implements
          * The number of times any item was moved to the top of the stack.
          */
         private int stackMoveCounter;
+        
+        /**
+         * Whether the current segment is currently calling a value loader.
+         */
+        private AtomicBoolean isLoading = new AtomicBoolean();
 
         /**
          * Create a new cache.
@@ -834,30 +840,55 @@ public class CacheLIRS<K, V> implements
         }
         
         V get(K key, int hash, Callable<? extends V> valueLoader) throws ExecutionException {
-            // avoid synchronization if it's in the cache
-            V value = get(key, hash);
-            if (value != null) {
-                return value;
-            }
-            synchronized (this) {
-                value = get(key, hash);
+            // we can not synchronize on a per-segment object while loading, as
+            // the value loader could access the cache (for example, using put,
+            // or another get with a loader), which might result in a deadlock
+            
+            // for at most 100 ms (100 x 1 ms), we avoid concurrent loading of
+            // multiple values in the same segment
+            for (int i = 0; i < 100; i++) {
+                V value = get(key, hash);
                 if (value != null) {
                     return value;
                 }
-                long start = System.nanoTime();
-                try {
-                    value = valueLoader.call();
-                    loadSuccessCount++;
-                } catch (Exception e) {
-                    loadExceptionCount++;
-                    throw new ExecutionException(e);
-                } finally {
-                    long time = System.nanoTime() - start;
-                    totalLoadTime += time;
+                if (!isLoading.getAndSet(true)) {
+                    value = load(key, hash, valueLoader);
+                    isLoading.set(false);
+                    synchronized (isLoading) {
+                        // notify the other thread
+                        isLoading.notifyAll();
+                    }
+                } else {
+                    // wait a bit, but at most until the other thread completed
+                    // loading
+                    synchronized (isLoading) {
+                        try {
+                            isLoading.wait(1);
+                        } catch (InterruptedException e) {
+                            // ignore
+                        }
+                    }
                 }
-                put(key, hash, value, cache.sizeOf(key, value));
-                return value;
             }
+            // give up (that means, the same value might be loaded concurrently)
+            return load(key, hash, valueLoader);
+        }
+            
+        V load(K key, int hash, Callable<? extends V> valueLoader) throws ExecutionException {
+            V value;
+            long start = System.nanoTime();
+            try {
+                value = valueLoader.call();
+                loadSuccessCount++;
+            } catch (Exception e) {
+                loadExceptionCount++;
+                throw new ExecutionException(e);
+            } finally {
+                long time = System.nanoTime() - start;
+                totalLoadTime += time;
+            }
+            put(key, hash, value, cache.sizeOf(key, value));
+            return value;
         }
         
         V get(K key, int hash, CacheLoader<K, V> loader) throws ExecutionException {

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java?rev=1644352&r1=1644351&r2=1644352&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java Wed Dec 10 09:00:55 2014
@@ -18,8 +18,12 @@
  */
 package org.apache.jackrabbit.oak.cache;
 
+import static org.junit.Assert.assertFalse;
+
 import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Test;
@@ -28,6 +32,58 @@ import org.junit.Test;
  * Tests the LIRS cache by concurrently reading and writing.
  */
 public class ConcurrentTest {
+    
+    @Test
+    public void testCacheAccessInLoaderDeadlock() throws Exception {
+        final Random r = new Random(1);
+        final CacheLIRS<Integer, Integer> cache = new CacheLIRS.Builder().
+                maximumWeight(100).averageWeight(10).build();
+        final Exception[] ex = new Exception[1];
+        final int entryCount = 100;
+        int size = 3;
+        Thread[] threads = new Thread[size];
+        final AtomicBoolean stop = new AtomicBoolean();
+        for (int i = 0; i < size; i++) {
+            Thread t = new Thread() {
+                @Override
+                public void run() {
+                    Callable<Integer> callable = new Callable<Integer>() {
+                        @Override
+                        public Integer call() throws ExecutionException {
+                            cache.get(r.nextInt(entryCount));
+                            return 1;
+                        }
+                    };
+                    while (!stop.get()) {
+                        Integer key = r.nextInt(entryCount);
+                        try {
+                            cache.get(key, callable);
+                        } catch (Exception e) {
+                            ex[0] = e;
+                        }
+                        cache.remove(key);
+                    }
+                }
+            };
+            t.start();
+            threads[i] = t;
+        }
+        // test for 100 ms
+        Thread.sleep(100);
+        stop.set(true);
+        for (Thread t : threads) {
+            t.join(1000);
+            // if the thread is still alive after 1 second, we assume
+            // there is a deadlock - we just let the threads alive,
+            // but report a failure (what else could we do?)
+            if (t.isAlive()) {
+                assertFalse("Deadlock detected!", t.isAlive());
+            }
+        }
+        if (ex[0] != null) {
+            throw ex[0];
+        }
+    }
 
     @Test
     public void testRandomOperations() throws Exception {