You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by th...@apache.org on 2015/06/04 14:49:49 UTC

svn commit: r1683538 - in /jackrabbit/oak/branches/1.2/oak-core/src: main/java/org/apache/jackrabbit/oak/cache/ main/java/org/apache/jackrabbit/oak/plugins/document/ test/java/org/apache/jackrabbit/oak/cache/

Author: thomasm
Date: Thu Jun  4 12:49:49 2015
New Revision: 1683538

URL: http://svn.apache.org/r1683538
Log:
OAK-2957 LIRS cache: config options for segment count and stack move distance
OAK-2830 LIRS cache: avoid concurrent loading of the same entry if loading is slow

Modified:
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
    jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
    jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java?rev=1683538&r1=1683537&r2=1683538&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/cache/CacheLIRS.java Thu Jun  4 12:49:49 2015
@@ -24,9 +24,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nullable;
 
@@ -96,6 +96,15 @@ public class CacheLIRS<K, V> implements
     private final CacheLoader<K, V> loader;
     
     /**
+     * A concurrent hash map of keys where loading is in progress. Key: the
+     * cache key. Value: a synchronization object. The threads that wait for the
+     * value to be loaded need to wait on the synchronization object. The
+     * loading thread will notify all waiting threads once loading is done.
+     */
+    final ConcurrentHashMap<K, Object> loadingInProgress = 
+            new ConcurrentHashMap<K, Object>();
+    
+    /**
      * Create a new cache with the given number of entries, and the default
      * settings (an average size of 1 per entry, 16 segments, and stack move
      * distance equals to the maximum number of entries divided by 100).
@@ -692,11 +701,6 @@ public class CacheLIRS<K, V> implements
          * The number of times any item was moved to the top of the stack.
          */
         private int stackMoveCounter;
-        
-        /**
-         * Whether the current segment is currently calling a value loader.
-         */
-        private AtomicBoolean isLoading = new AtomicBoolean();
 
         /**
          * Create a new cache.
@@ -842,38 +846,62 @@ public class CacheLIRS<K, V> implements
         }
         
         V get(K key, int hash, Callable<? extends V> valueLoader) throws ExecutionException {
-            // we can not synchronize on a per-segment object while loading, as
-            // the value loader could access the cache (for example, using put,
-            // or another get with a loader), which might result in a deadlock
-            
-            // for at most 100 ms (100 x 1 ms), we avoid concurrent loading of
-            // multiple values in the same segment
-            for (int i = 0; i < 100; i++) {
+            // we can not synchronize on a per-segment object while loading,
+            // because we don't want to block cache access while loading, and
+            // because the value loader could access the cache (for example,
+            // using put, or another get with a loader), which might result in a
+            // deadlock
+            // we loop here because another thread might load the value,
+            // but loading might fail there, so we might need to repeat this
+            while (true) {
                 V value = get(key, hash);
+                // the (hopefully) normal case
                 if (value != null) {
                     return value;
                 }
-                if (!isLoading.getAndSet(true)) {
-                    value = load(key, hash, valueLoader);
-                    isLoading.set(false);
-                    synchronized (isLoading) {
-                        // notify the other thread
-                        isLoading.notifyAll();
-                    }
-                } else {
-                    // wait a bit, but at most until the other thread completed
-                    // loading
-                    synchronized (isLoading) {
+                ConcurrentHashMap<K, Object> loading = cache.loadingInProgress;
+                // the object we have to wait for in case another thread loads
+                // this value
+                Object alreadyLoading;
+                // synchronized on this object, even before we put it in the
+                // cache, so that all other threads that get this object can
+                // synchronized and wait for it
+                Object loadNow = new Object();
+                // we synchronize a bit early here, but that's fine (we don't
+                // optimize for the case where loading is extremely quick)
+                synchronized (loadNow) {
+                    alreadyLoading = loading.putIfAbsent(key, loadNow);
+                    if (alreadyLoading == null) {
+                        // we are loading ourselves
                         try {
-                            isLoading.wait(1);
-                        } catch (InterruptedException e) {
-                            // ignore
+                            return load(key, hash, valueLoader);
+                        } finally {
+                            loading.remove(key);
+                            // notify other threads
+                            loadNow.notifyAll();
                         }
                     }
+                }             
+                // another thread is (or was) already loading
+                synchronized (alreadyLoading) {
+                    // loading might have been finished, so check again
+                    Object alreadyLoading2 = loading.get(key);
+                    if (alreadyLoading2 != alreadyLoading) {
+                        // loading has completed before we could synchronize,
+                        // so we repeat
+                        continue;
+                    }
+                    // still loading: wait
+                    try {
+                        // we could wait longer than 10 ms, but we are
+                        // in case notify is not called for some weird reason
+                        // (for example out of memory)
+                        alreadyLoading.wait(10);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
                 }
             }
-            // give up (that means, the same value might be loaded concurrently)
-            return load(key, hash, valueLoader);
         }
             
         V load(K key, int hash, Callable<? extends V> valueLoader) throws ExecutionException {
@@ -1375,6 +1403,8 @@ public class CacheLIRS<K, V> implements
         private Weigher<?, ?> weigher;
         private long maxWeight;
         private int averageWeight = 100;
+        private int segmentCount = 16;
+        private int stackMoveDistance = 16;
 
         public Builder recordStats() {
             return this;
@@ -1401,6 +1431,24 @@ public class CacheLIRS<K, V> implements
             return this;
         }
 
+        public Builder segmentCount(int segmentCount) {
+            if (Integer.bitCount(segmentCount) != 1 || segmentCount < 0 || segmentCount > 65536) {
+                LOG.warn("Illegal segment count: " + segmentCount + ", using 16");
+                segmentCount = 16;
+            }
+            this.segmentCount = segmentCount;
+            return this;
+        }
+
+        public Builder stackMoveDistance(int stackMoveDistance) {
+            if (stackMoveDistance < 0) {
+                LOG.warn("Illegal stack move distance: " + stackMoveDistance + ", using 16");
+                stackMoveDistance = 16;
+            }            
+            this.stackMoveDistance = stackMoveDistance;
+            return this;
+        }
+
         public <K, V> CacheLIRS<K, V> build() {
             return build(null);
         }
@@ -1409,7 +1457,8 @@ public class CacheLIRS<K, V> implements
                 CacheLoader<K, V> cacheLoader) {
             @SuppressWarnings("unchecked")
             Weigher<K, V> w = (Weigher<K, V>) weigher;
-            return new CacheLIRS<K, V>(w, maxWeight, averageWeight, 16, 16, cacheLoader);
+            return new CacheLIRS<K, V>(w, maxWeight, averageWeight, 
+                    segmentCount, stackMoveDistance, cacheLoader);
         }
 
     }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java?rev=1683538&r1=1683537&r2=1683538&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java Thu Jun  4 12:49:49 2015
@@ -83,10 +83,14 @@ public class DocumentMK implements Micro
             "oak.documentMK.manyChildren", 50);
 
     /**
-     * Enable the LIRS cache.
+     * Enable or disable the LIRS cache (null to use the default setting for this configuration).
      */
-    static final boolean LIRS_CACHE = Boolean.parseBoolean(
-            System.getProperty("oak.documentMK.lirsCache", "false"));
+    static final Boolean LIRS_CACHE;
+    
+    static {
+        String s = System.getProperty("oak.documentMK.lirsCache");
+        LIRS_CACHE = s == null ? null : Boolean.parseBoolean(s);
+    }
 
     /**
      * Enable fast diff operations.
@@ -503,6 +507,8 @@ public class DocumentMK implements Micro
         public static final int DEFAULT_CHILDREN_CACHE_PERCENTAGE = 10;
         public static final int DEFAULT_DIFF_CACHE_PERCENTAGE = 5;
         public static final int DEFAULT_DOC_CHILDREN_CACHE_PERCENTAGE = 3;
+        public static final int DEFAULT_CACHE_SEGMENT_COUNT = 16;
+        public static final int DEFAULT_CACHE_STACK_MOVE_DISTANCE = 16;
         private DocumentNodeStore nodeStore;
         private DocumentStore documentStore;
         private DiffCache diffCache;
@@ -519,6 +525,8 @@ public class DocumentMK implements Micro
         private int childrenCachePercentage = DEFAULT_CHILDREN_CACHE_PERCENTAGE;
         private int diffCachePercentage = DEFAULT_DIFF_CACHE_PERCENTAGE;
         private int docChildrenCachePercentage = DEFAULT_DOC_CHILDREN_CACHE_PERCENTAGE;
+        private int cacheSegmentCount = DEFAULT_CACHE_SEGMENT_COUNT;
+        private int cacheStackMoveDistance = DEFAULT_CACHE_STACK_MOVE_DISTANCE;
         private boolean useSimpleRevision;
         private long splitDocumentAgeMillis = 5 * 60 * 1000;
         private long offHeapCacheSize = -1;
@@ -724,6 +732,16 @@ public class DocumentMK implements Micro
             this.clusterId = clusterId;
             return this;
         }
+        
+        public Builder setCacheSegmentCount(int cacheSegmentCount) {
+            this.cacheSegmentCount = cacheSegmentCount;
+            return this;
+        }
+        
+        public Builder setCacheStackMoveDistance(int cacheSegmentCount) {
+            this.cacheStackMoveDistance = cacheSegmentCount;
+            return this;
+        }
 
         public int getClusterId() {
             return clusterId;
@@ -943,11 +961,20 @@ public class DocumentMK implements Micro
         
         private <K extends CacheValue, V extends CacheValue> Cache<K, V> buildCache(
                 long maxWeight) {
-            if (LIRS_CACHE || persistentCacheURI != null) {
+            // by default, use the LIRS cache when using the persistent cache,
+            // but don't use it otherwise
+            boolean useLirs = persistentCacheURI != null;
+            // allow to override this by using the system property
+            if (LIRS_CACHE != null) {
+                useLirs = LIRS_CACHE;
+            }
+            if (useLirs) {
                 return CacheLIRS.newBuilder().
                         weigher(weigher).
                         averageWeight(2000).
                         maximumWeight(maxWeight).
+                        segmentCount(cacheSegmentCount).
+                        stackMoveDistance(cacheStackMoveDistance).
                         recordStats().
                         build();
             }

Modified: jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java?rev=1683538&r1=1683537&r2=1683538&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java Thu Jun  4 12:49:49 2015
@@ -101,6 +101,8 @@ public class DocumentNodeStoreService {
     private static final int DEFAULT_BLOB_CACHE_SIZE = 16;
     private static final String DEFAULT_DB = "oak";
     private static final String DEFAULT_PERSISTENT_CACHE = "";
+    private static final int DEFAULT_CACHE_SEGMENT_COUNT = 16;
+    private static final int DEFAULT_CACHE_STACK_MOVE_DISTANCE = 16;
     private static final String PREFIX = "oak.documentstore.";
     private static final String DESCRIPTION = "oak.nodestore.description";
 
@@ -160,6 +162,23 @@ public class DocumentNodeStoreService {
     )
     private static final String PROP_DOC_CHILDREN_CACHE_PERCENTAGE = "docChildrenCachePercentage";
 
+    @Property(intValue = DocumentMK.Builder.DEFAULT_CACHE_SEGMENT_COUNT,
+            label = "LIRS Cache Segment Count",
+            description = "The number of segments in the LIRS cache " + 
+                    "(default 16, a higher count means higher concurrency " + 
+                    "but slightly lower cache hit rate)"
+    )
+    private static final String PROP_CACHE_SEGMENT_COUNT = "cacheSegmentCount";
+
+    @Property(intValue = DocumentMK.Builder.DEFAULT_CACHE_STACK_MOVE_DISTANCE,
+            label = "LIRS Cache Stack Move Distance",
+            description = "The delay to move entries to the head of the queue " + 
+                    "in the LIRS cache " +
+                    "(default 16, a higher value means higher concurrency " + 
+                    "but slightly lower cache hit rate)"
+    )
+    private static final String PROP_CACHE_STACK_MOVE_DISTANCE = "cacheStackMoveDistance";
+
     private static final String PROP_OFF_HEAP_CACHE = "offHeapCache";
 
     @Property(intValue =  DEFAULT_CHANGES_SIZE,
@@ -338,6 +357,8 @@ public class DocumentNodeStoreService {
         int changesSize = toInteger(prop(PROP_CHANGES_SIZE), DEFAULT_CHANGES_SIZE);
         int blobCacheSize = toInteger(prop(PROP_BLOB_CACHE_SIZE), DEFAULT_BLOB_CACHE_SIZE);
         String persistentCache = PropertiesUtil.toString(prop(PROP_PERSISTENT_CACHE), DEFAULT_PERSISTENT_CACHE);
+        int cacheSegmentCount = toInteger(prop(PROP_CACHE_SEGMENT_COUNT), DEFAULT_CACHE_SEGMENT_COUNT);
+        int cacheStackMoveDistance = toInteger(prop(PROP_CACHE_STACK_MOVE_DISTANCE), DEFAULT_CACHE_STACK_MOVE_DISTANCE);
 
         DocumentMK.Builder mkBuilder =
                 new DocumentMK.Builder().
@@ -347,6 +368,8 @@ public class DocumentNodeStoreService {
                         childrenCachePercentage, 
                         docChildrenCachePercentage, 
                         diffCachePercentage).
+                setCacheSegmentCount(cacheSegmentCount).
+                setCacheStackMoveDistance(cacheStackMoveDistance).
                 offHeapCacheSize(offHeapCache * MB);
         
         if (persistentCache != null && persistentCache.length() > 0) {

Modified: jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java?rev=1683538&r1=1683537&r2=1683538&view=diff
==============================================================================
--- jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java (original)
+++ jackrabbit/oak/branches/1.2/oak-core/src/test/java/org/apache/jackrabbit/oak/cache/ConcurrentTest.java Thu Jun  4 12:49:49 2015
@@ -19,12 +19,15 @@
 package org.apache.jackrabbit.oak.cache;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.junit.Test;
 
@@ -32,6 +35,71 @@ import org.junit.Test;
  * Tests the LIRS cache by concurrently reading and writing.
  */
 public class ConcurrentTest {
+
+    @Test
+    public void testLoaderBlock() throws Exception {
+        // access to the same segment should not be blocked while loading an entry
+        // only access to this entry is blocked
+        final CacheLIRS<Integer, Integer> cache = new CacheLIRS.Builder().
+                maximumWeight(100).averageWeight(10).build();
+        final Exception[] ex = new Exception[1];
+        int threadCount = 10;
+        Thread[] threads = new Thread[threadCount];
+        final AtomicBoolean stop = new AtomicBoolean();
+        final AtomicInteger nextKey = new AtomicInteger();
+        final AtomicLong additionalWait = new AtomicLong();
+        for (int i = 0; i < threadCount; i++) {
+            Thread t = new Thread() {
+                @Override
+                public void run() {
+                    while (!stop.get()) {
+                        final int key = nextKey.getAndIncrement();
+                        final int wait = key;
+                        Callable<Integer> callable = new Callable<Integer>() {
+                            @Override
+                            public Integer call() throws ExecutionException {
+                                try {
+                                    Thread.sleep(wait);
+                                } catch (InterruptedException e) {
+                                    // ignore
+                                }
+                                cache.get(key * 10);
+                                return 1;
+                            }
+                        };
+                        long start = System.currentTimeMillis();
+                        try {
+                            cache.get(key, callable);
+                        } catch (Exception e) {
+                            ex[0] = e;
+                        }
+                        long time = System.currentTimeMillis() - start;
+                        additionalWait.addAndGet(time - wait);
+                        cache.remove(key);
+                    }
+                }
+            };
+            t.start();
+            threads[i] = t;
+        }
+        // test for 1000 ms
+        Thread.sleep(1000);
+        stop.set(true);
+        for (Thread t : threads) {
+            t.join(1000);
+            // if the thread is still alive after 1 second, we assume
+            // there is a deadlock - we just let the threads alive,
+            // but report a failure (what else could we do?)
+            if (t.isAlive()) {
+                assertFalse("Deadlock detected!", t.isAlive());
+            }
+        }
+        if (ex[0] != null) {
+            throw ex[0];
+        }        
+        long add = additionalWait.get();
+        assertTrue("Had to wait unexpectedly long for other threads: " + add, add < 1000);
+    }
     
     @Test
     public void testCacheAccessInLoaderDeadlock() throws Exception {