You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2014/12/10 10:00:55 UTC
svn commit: r1644352 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java
test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java
Author: thomasm
Date: Wed Dec 10 09:00:55 2014
New Revision: 1644352
URL: http://svn.apache.org/r1644352
Log:
OAK-2332 LIRS cache: deadlock if a value loader access the cache
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java?rev=1644352&r1=1644351&r2=1644352&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java Wed Dec 10 09:00:55 2014
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
@@ -689,6 +690,11 @@ public class CacheLIRS<K, V> implements
* The number of times any item was moved to the top of the stack.
*/
private int stackMoveCounter;
+
+ /**
+ * Whether the current segment is currently calling a value loader.
+ */
+ private AtomicBoolean isLoading = new AtomicBoolean();
/**
* Create a new cache.
@@ -834,30 +840,55 @@ public class CacheLIRS<K, V> implements
}
V get(K key, int hash, Callable<? extends V> valueLoader) throws ExecutionException {
- // avoid synchronization if it's in the cache
- V value = get(key, hash);
- if (value != null) {
- return value;
- }
- synchronized (this) {
- value = get(key, hash);
+ // we can not synchronize on a per-segment object while loading, as
+ // the value loader could access the cache (for example, using put,
+ // or another get with a loader), which might result in a deadlock
+
+ // for at most 100 ms (100 x 1 ms), we avoid concurrent loading of
+ // multiple values in the same segment
+ for (int i = 0; i < 100; i++) {
+ V value = get(key, hash);
if (value != null) {
return value;
}
- long start = System.nanoTime();
- try {
- value = valueLoader.call();
- loadSuccessCount++;
- } catch (Exception e) {
- loadExceptionCount++;
- throw new ExecutionException(e);
- } finally {
- long time = System.nanoTime() - start;
- totalLoadTime += time;
+ if (!isLoading.getAndSet(true)) {
+ value = load(key, hash, valueLoader);
+ isLoading.set(false);
+ synchronized (isLoading) {
+ // notify the other thread
+ isLoading.notifyAll();
+ }
+ } else {
+ // wait a bit, but at most until the other thread completed
+ // loading
+ synchronized (isLoading) {
+ try {
+ isLoading.wait(1);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
}
- put(key, hash, value, cache.sizeOf(key, value));
- return value;
}
+ // give up (that means, the same value might be loaded concurrently)
+ return load(key, hash, valueLoader);
+ }
+
+ V load(K key, int hash, Callable<? extends V> valueLoader) throws ExecutionException {
+ V value;
+ long start = System.nanoTime();
+ try {
+ value = valueLoader.call();
+ loadSuccessCount++;
+ } catch (Exception e) {
+ loadExceptionCount++;
+ throw new ExecutionException(e);
+ } finally {
+ long time = System.nanoTime() - start;
+ totalLoadTime += time;
+ }
+ put(key, hash, value, cache.sizeOf(key, value));
+ return value;
}
V get(K key, int hash, CacheLoader<K, V> loader) throws ExecutionException {
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java?rev=1644352&r1=1644351&r2=1644352&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java Wed Dec 10 09:00:55 2014
@@ -18,8 +18,12 @@
*/
package org.apache.jackrabbit.oak.cache;
+import static org.junit.Assert.assertFalse;
+
import java.util.Random;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
@@ -28,6 +32,58 @@ import org.junit.Test;
* Tests the LIRS cache by concurrently reading and writing.
*/
public class ConcurrentTest {
+
+ @Test
+ public void testCacheAccessInLoaderDeadlock() throws Exception {
+ final Random r = new Random(1);
+ final CacheLIRS<Integer, Integer> cache = new CacheLIRS.Builder().
+ maximumWeight(100).averageWeight(10).build();
+ final Exception[] ex = new Exception[1];
+ final int entryCount = 100;
+ int size = 3;
+ Thread[] threads = new Thread[size];
+ final AtomicBoolean stop = new AtomicBoolean();
+ for (int i = 0; i < size; i++) {
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ Callable<Integer> callable = new Callable<Integer>() {
+ @Override
+ public Integer call() throws ExecutionException {
+ cache.get(r.nextInt(entryCount));
+ return 1;
+ }
+ };
+ while (!stop.get()) {
+ Integer key = r.nextInt(entryCount);
+ try {
+ cache.get(key, callable);
+ } catch (Exception e) {
+ ex[0] = e;
+ }
+ cache.remove(key);
+ }
+ }
+ };
+ t.start();
+ threads[i] = t;
+ }
+ // test for 100 ms
+ Thread.sleep(100);
+ stop.set(true);
+ for (Thread t : threads) {
+ t.join(1000);
+ // if the thread is still alive after 1 second, we assume
+ // there is a deadlock - we just let the threads alive,
+ // but report a failure (what else could we do?)
+ if (t.isAlive()) {
+ assertFalse("Deadlock detected!", t.isAlive());
+ }
+ }
+ if (ex[0] != null) {
+ throw ex[0];
+ }
+ }
@Test
public void testRandomOperations() throws Exception {