You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2015/04/13 16:40:45 UTC

accumulo git commit: ACCUMULO-3349 fixing concurrency of LRU block cache

Repository: accumulo
Updated Branches:
  refs/heads/master 9abe28d6a -> 6d2cd6596


ACCUMULO-3349 fixing concurrency of LRU block cache


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6d2cd659
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6d2cd659
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6d2cd659

Branch: refs/heads/master
Commit: 6d2cd6596fc148e7eb951ea42a45fd720445aad5
Parents: 9abe28d
Author: John Vines <vi...@apache.org>
Authored: Mon Apr 13 10:39:52 2015 -0400
Committer: John Vines <vi...@apache.org>
Committed: Mon Apr 13 10:39:52 2015 -0400

----------------------------------------------------------------------
 .../file/blockfile/cache/LruBlockCache.java     | 24 ++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6d2cd659/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
index d4b875c..2bd1a38 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
@@ -198,7 +198,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
     }
     this.maxSize = maxSize;
     this.blockSize = blockSize;
-    map = new ConcurrentHashMap<String,CachedBlock>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
+    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
     this.minFactor = minFactor;
     this.acceptableFactor = acceptableFactor;
     this.singleFactor = singleFactor;
@@ -254,14 +254,20 @@ public class LruBlockCache implements BlockCache, HeapSize {
     if (cb != null) {
       stats.duplicateReads();
       cb.access(count.incrementAndGet());
-
     } else {
       cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory);
-      long newSize = size.addAndGet(cb.heapSize());
-      map.put(blockName, cb);
-      elements.incrementAndGet();
-      if (newSize > acceptableSize() && !evictionInProgress) {
-        runEviction();
+      CachedBlock currCb = map.putIfAbsent(blockName, cb);
+      if (currCb != null) {
+        stats.duplicateReads();
+        cb = currCb;
+        cb.access(count.incrementAndGet());
+      } else {
+        // Actually added block to cache
+        long newSize = size.addAndGet(cb.heapSize());
+        elements.incrementAndGet();
+        if (newSize > acceptableSize() && !evictionInProgress) {
+          runEviction();
+        }
       }
     }
 
@@ -364,7 +370,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
         }
       }
 
-      PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<BlockBucket>(3);
+      PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
 
       bucketQueue.add(bucketSingle);
       bucketQueue.add(bucketMulti);
@@ -517,7 +523,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
     public EvictionThread(LruBlockCache cache) {
       super("LruBlockCache.EvictionThread");
       setDaemon(true);
-      this.cache = new WeakReference<LruBlockCache>(cache);
+      this.cache = new WeakReference<>(cache);
     }
 
     public synchronized boolean running() {