You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2015/06/04 14:49:49 UTC
svn commit: r1683538 - in /jackrabbit/oak/branches/1.2/oak-core/src:
main/java/org/apache/jackrabbit/oak/cache/
main/java/org/apache/jackrabbit/oak/plugins/document/
test/java/org/apache/jackrabbit/oak/cache/
Author: thomasm
Date: Thu Jun 4 12:49:49 2015
New Revision: 1683538
URL: http://svn.apache.org/r1683538
Log:
OAK-2957 LIRS cache: config options for segment count and stack move distance
OAK-2830 LIRS cache: avoid concurrent loading of the same entry if loading is slow
Modified:
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java?rev=1683538&r1=1683537&r2=1683538&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java Thu Jun 4 12:49:49 2015
@@ -24,9 +24,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
@@ -96,6 +96,15 @@ public class CacheLIRS<K, V> implements
private final CacheLoader<K, V> loader;
/**
+ * A concurrent hash map of keys where loading is in progress. Key: the
+ * cache key. Value: a synchronization object. The threads that wait for the
+ * value to be loaded need to wait on the synchronization object. The
+ * loading thread will notify all waiting threads once loading is done.
+ */
+ final ConcurrentHashMap<K, Object> loadingInProgress =
+ new ConcurrentHashMap<K, Object>();
+
+ /**
* Create a new cache with the given number of entries, and the default
* settings (an average size of 1 per entry, 16 segments, and stack move
* distance equals to the maximum number of entries divided by 100).
@@ -692,11 +701,6 @@ public class CacheLIRS<K, V> implements
* The number of times any item was moved to the top of the stack.
*/
private int stackMoveCounter;
-
- /**
- * Whether the current segment is currently calling a value loader.
- */
- private AtomicBoolean isLoading = new AtomicBoolean();
/**
* Create a new cache.
@@ -842,38 +846,62 @@ public class CacheLIRS<K, V> implements
}
V get(K key, int hash, Callable<? extends V> valueLoader) throws ExecutionException {
- // we can not synchronize on a per-segment object while loading, as
- // the value loader could access the cache (for example, using put,
- // or another get with a loader), which might result in a deadlock
-
- // for at most 100 ms (100 x 1 ms), we avoid concurrent loading of
- // multiple values in the same segment
- for (int i = 0; i < 100; i++) {
+ // we can not synchronize on a per-segment object while loading,
+ // because we don't want to block cache access while loading, and
+ // because the value loader could access the cache (for example,
+ // using put, or another get with a loader), which might result in a
+ // deadlock
+ // we loop here because another thread might load the value,
+ // but loading might fail there, so we might need to repeat this
+ while (true) {
V value = get(key, hash);
+ // the (hopefully) normal case
if (value != null) {
return value;
}
- if (!isLoading.getAndSet(true)) {
- value = load(key, hash, valueLoader);
- isLoading.set(false);
- synchronized (isLoading) {
- // notify the other thread
- isLoading.notifyAll();
- }
- } else {
- // wait a bit, but at most until the other thread completed
- // loading
- synchronized (isLoading) {
+ ConcurrentHashMap<K, Object> loading = cache.loadingInProgress;
+ // the object we have to wait for in case another thread loads
+ // this value
+ Object alreadyLoading;
+ // synchronized on this object, even before we put it in the
+ // cache, so that all other threads that get this object can
+ // synchronized and wait for it
+ Object loadNow = new Object();
+ // we synchronize a bit early here, but that's fine (we don't
+ // optimize for the case where loading is extremely quick)
+ synchronized (loadNow) {
+ alreadyLoading = loading.putIfAbsent(key, loadNow);
+ if (alreadyLoading == null) {
+ // we are loading ourselves
try {
- isLoading.wait(1);
- } catch (InterruptedException e) {
- // ignore
+ return load(key, hash, valueLoader);
+ } finally {
+ loading.remove(key);
+ // notify other threads
+ loadNow.notifyAll();
}
}
+ }
+ // another thread is (or was) already loading
+ synchronized (alreadyLoading) {
+ // loading might have been finished, so check again
+ Object alreadyLoading2 = loading.get(key);
+ if (alreadyLoading2 != alreadyLoading) {
+ // loading has completed before we could synchronize,
+ // so we repeat
+ continue;
+ }
+ // still loading: wait
+ try {
+ // we could wait longer than 10 ms, but we are
+ // in case notify is not called for some weird reason
+ // (for example out of memory)
+ alreadyLoading.wait(10);
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
}
- // give up (that means, the same value might be loaded concurrently)
- return load(key, hash, valueLoader);
}
V load(K key, int hash, Callable<? extends V> valueLoader) throws ExecutionException {
@@ -1375,6 +1403,8 @@ public class CacheLIRS<K, V> implements
private Weigher<?, ?> weigher;
private long maxWeight;
private int averageWeight = 100;
+ private int segmentCount = 16;
+ private int stackMoveDistance = 16;
public Builder recordStats() {
return this;
@@ -1401,6 +1431,24 @@ public class CacheLIRS<K, V> implements
return this;
}
+ public Builder segmentCount(int segmentCount) {
+ if (Integer.bitCount(segmentCount) != 1 || segmentCount < 0 || segmentCount > 65536) {
+ LOG.warn("Illegal segment count: " + segmentCount + ", using 16");
+ segmentCount = 16;
+ }
+ this.segmentCount = segmentCount;
+ return this;
+ }
+
+ public Builder stackMoveDistance(int stackMoveDistance) {
+ if (stackMoveDistance < 0) {
+ LOG.warn("Illegal stack move distance: " + stackMoveDistance + ", using 16");
+ stackMoveDistance = 16;
+ }
+ this.stackMoveDistance = stackMoveDistance;
+ return this;
+ }
+
public <K, V> CacheLIRS<K, V> build() {
return build(null);
}
@@ -1409,7 +1457,8 @@ public class CacheLIRS<K, V> implements
CacheLoader<K, V> cacheLoader) {
@SuppressWarnings("unchecked")
Weigher<K, V> w = (Weigher<K, V>) weigher;
- return new CacheLIRS<K, V>(w, maxWeight, averageWeight, 16, 16, cacheLoader);
+ return new CacheLIRS<K, V>(w, maxWeight, averageWeight,
+ segmentCount, stackMoveDistance, cacheLoader);
}
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1683538&r1=1683537&r2=1683538&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Thu Jun 4 12:49:49 2015
@@ -83,10 +83,14 @@ public class DocumentMK implements Micro
"oak.documentMK.manyChildren", 50);
/**
- * Enable the LIRS cache.
+ * Enable or disable the LIRS cache (null to use the default setting for this configuration).
*/
- static final boolean LIRS_CACHE = Boolean.parseBoolean(
- System.getProperty("oak.documentMK.lirsCache", "false"));
+ static final Boolean LIRS_CACHE;
+
+ static {
+ String s = System.getProperty("oak.documentMK.lirsCache");
+ LIRS_CACHE = s == null ? null : Boolean.parseBoolean(s);
+ }
/**
* Enable fast diff operations.
@@ -503,6 +507,8 @@ public class DocumentMK implements Micro
public static final int DEFAULT_CHILDREN_CACHE_PERCENTAGE = 10;
public static final int DEFAULT_DIFF_CACHE_PERCENTAGE = 5;
public static final int DEFAULT_DOC_CHILDREN_CACHE_PERCENTAGE = 3;
+ public static final int DEFAULT_CACHE_SEGMENT_COUNT = 16;
+ public static final int DEFAULT_CACHE_STACK_MOVE_DISTANCE = 16;
private DocumentNodeStore nodeStore;
private DocumentStore documentStore;
private DiffCache diffCache;
@@ -519,6 +525,8 @@ public class DocumentMK implements Micro
private int childrenCachePercentage = DEFAULT_CHILDREN_CACHE_PERCENTAGE;
private int diffCachePercentage = DEFAULT_DIFF_CACHE_PERCENTAGE;
private int docChildrenCachePercentage = DEFAULT_DOC_CHILDREN_CACHE_PERCENTAGE;
+ private int cacheSegmentCount = DEFAULT_CACHE_SEGMENT_COUNT;
+ private int cacheStackMoveDistance = DEFAULT_CACHE_STACK_MOVE_DISTANCE;
private boolean useSimpleRevision;
private long splitDocumentAgeMillis = 5 * 60 * 1000;
private long offHeapCacheSize = -1;
@@ -724,6 +732,16 @@ public class DocumentMK implements Micro
this.clusterId = clusterId;
return this;
}
+
+ public Builder setCacheSegmentCount(int cacheSegmentCount) {
+ this.cacheSegmentCount = cacheSegmentCount;
+ return this;
+ }
+
+ public Builder setCacheStackMoveDistance(int cacheSegmentCount) {
+ this.cacheStackMoveDistance = cacheSegmentCount;
+ return this;
+ }
public int getClusterId() {
return clusterId;
@@ -943,11 +961,20 @@ public class DocumentMK implements Micro
private <K extends CacheValue, V extends CacheValue> Cache<K, V> buildCache(
long maxWeight) {
- if (LIRS_CACHE || persistentCacheURI != null) {
+ // by default, use the LIRS cache when using the persistent cache,
+ // but don't use it otherwise
+ boolean useLirs = persistentCacheURI != null;
+ // allow to override this by using the system property
+ if (LIRS_CACHE != null) {
+ useLirs = LIRS_CACHE;
+ }
+ if (useLirs) {
return CacheLIRS.newBuilder().
weigher(weigher).
averageWeight(2000).
maximumWeight(maxWeight).
+ segmentCount(cacheSegmentCount).
+ stackMoveDistance(cacheStackMoveDistance).
recordStats().
build();
}
Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1683538&r1=1683537&r2=1683538&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java Thu Jun 4 12:49:49 2015
@@ -101,6 +101,8 @@ public class DocumentNodeStoreService {
private static final int DEFAULT_BLOB_CACHE_SIZE = 16;
private static final String DEFAULT_DB = "oak";
private static final String DEFAULT_PERSISTENT_CACHE = "";
+ private static final int DEFAULT_CACHE_SEGMENT_COUNT = 16;
+ private static final int DEFAULT_CACHE_STACK_MOVE_DISTANCE = 16;
private static final String PREFIX = "oak.documentstore.";
private static final String DESCRIPTION = "oak.nodestore.description";
@@ -160,6 +162,23 @@ public class DocumentNodeStoreService {
)
private static final String PROP_DOC_CHILDREN_CACHE_PERCENTAGE = "docChildrenCachePercentage";
+ @Property(intValue = DocumentMK.Builder.DEFAULT_CACHE_SEGMENT_COUNT,
+ label = "LIRS Cache Segment Count",
+ description = "The number of segments in the LIRS cache " +
+ "(default 16, a higher count means higher concurrency " +
+ "but slightly lower cache hit rate)"
+ )
+ private static final String PROP_CACHE_SEGMENT_COUNT = "cacheSegmentCount";
+
+ @Property(intValue = DocumentMK.Builder.DEFAULT_CACHE_STACK_MOVE_DISTANCE,
+ label = "LIRS Cache Stack Move Distance",
+ description = "The delay to move entries to the head of the queue " +
+ "in the LIRS cache " +
+ "(default 16, a higher value means higher concurrency " +
+ "but slightly lower cache hit rate)"
+ )
+ private static final String PROP_CACHE_STACK_MOVE_DISTANCE = "cacheStackMoveDistance";
+
private static final String PROP_OFF_HEAP_CACHE = "offHeapCache";
@Property(intValue = DEFAULT_CHANGES_SIZE,
@@ -338,6 +357,8 @@ public class DocumentNodeStoreService {
int changesSize = toInteger(prop(PROP_CHANGES_SIZE), DEFAULT_CHANGES_SIZE);
int blobCacheSize = toInteger(prop(PROP_BLOB_CACHE_SIZE), DEFAULT_BLOB_CACHE_SIZE);
String persistentCache = PropertiesUtil.toString(prop(PROP_PERSISTENT_CACHE), DEFAULT_PERSISTENT_CACHE);
+ int cacheSegmentCount = toInteger(prop(PROP_CACHE_SEGMENT_COUNT), DEFAULT_CACHE_SEGMENT_COUNT);
+ int cacheStackMoveDistance = toInteger(prop(PROP_CACHE_STACK_MOVE_DISTANCE), DEFAULT_CACHE_STACK_MOVE_DISTANCE);
DocumentMK.Builder mkBuilder =
new DocumentMK.Builder().
@@ -347,6 +368,8 @@ public class DocumentNodeStoreService {
childrenCachePercentage,
docChildrenCachePercentage,
diffCachePercentage).
+ setCacheSegmentCount(cacheSegmentCount).
+ setCacheStackMoveDistance(cacheStackMoveDistance).
offHeapCacheSize(offHeapCache * MB);
if (persistentCache != null && persistentCache.length() > 0) {
Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java?rev=1683538&r1=1683537&r2=1683538&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java Thu Jun 4 12:49:49 2015
@@ -19,12 +19,15 @@
package org.apache.jackrabbit.oak.cache;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
@@ -32,6 +35,71 @@ import org.junit.Test;
* Tests the LIRS cache by concurrently reading and writing.
*/
public class ConcurrentTest {
+
+ @Test
+ public void testLoaderBlock() throws Exception {
+ // access to the same segment should not be blocked while loading an entry
+ // only access to this entry is blocked
+ final CacheLIRS<Integer, Integer> cache = new CacheLIRS.Builder().
+ maximumWeight(100).averageWeight(10).build();
+ final Exception[] ex = new Exception[1];
+ int threadCount = 10;
+ Thread[] threads = new Thread[threadCount];
+ final AtomicBoolean stop = new AtomicBoolean();
+ final AtomicInteger nextKey = new AtomicInteger();
+ final AtomicLong additionalWait = new AtomicLong();
+ for (int i = 0; i < threadCount; i++) {
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ while (!stop.get()) {
+ final int key = nextKey.getAndIncrement();
+ final int wait = key;
+ Callable<Integer> callable = new Callable<Integer>() {
+ @Override
+ public Integer call() throws ExecutionException {
+ try {
+ Thread.sleep(wait);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ cache.get(key * 10);
+ return 1;
+ }
+ };
+ long start = System.currentTimeMillis();
+ try {
+ cache.get(key, callable);
+ } catch (Exception e) {
+ ex[0] = e;
+ }
+ long time = System.currentTimeMillis() - start;
+ additionalWait.addAndGet(time - wait);
+ cache.remove(key);
+ }
+ }
+ };
+ t.start();
+ threads[i] = t;
+ }
+ // test for 1000 ms
+ Thread.sleep(1000);
+ stop.set(true);
+ for (Thread t : threads) {
+ t.join(1000);
+ // if the thread is still alive after 1 second, we assume
+ // there is a deadlock - we just let the threads alive,
+ // but report a failure (what else could we do?)
+ if (t.isAlive()) {
+ assertFalse("Deadlock detected!", t.isAlive());
+ }
+ }
+ if (ex[0] != null) {
+ throw ex[0];
+ }
+ long add = additionalWait.get();
+ assertTrue("Had to wait unexpectedly long for other threads: " + add, add < 1000);
+ }
@Test
public void testCacheAccessInLoaderDeadlock() throws Exception {