You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/07 21:24:14 UTC

[3/5] git commit: Fix FileCacheService regressions patch by jbellis; reviewed by pyaskevich and tested by Kai Wang for CASSANDRA-6149

Fix FileCacheService regressions
patch by jbellis; reviewed by pyaskevich and tested by Kai Wang for CASSANDRA-6149


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01a57eea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01a57eea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01a57eea

Branch: refs/heads/trunk
Commit: 01a57eea841e51fb4a97329ab9fa0f59d0b826f6
Parents: c374aca
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Oct 7 14:20:42 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Oct 7 14:20:42 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedRandomAccessReader.java  |  5 ++
 .../cassandra/io/util/RandomAccessReader.java   |  2 +-
 .../apache/cassandra/io/util/SegmentedFile.java |  3 +-
 .../cassandra/service/FileCacheService.java     | 87 ++++++++++----------
 5 files changed, 54 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 94fa927..ddd976e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.2
+ * Fix FileCacheService regressions (CASSANDRA-6149)
  * Never return WriteTimeout for CL.ANY (CASSANDRA-6032)
  * Fix race conditions in bulk loader (CASSANDRA-6129)
  * Add configurable metrics reporting (CASSANDRA-4430)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index b6cffa2..131a4d6 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -154,6 +154,11 @@ public class CompressedRandomAccessReader extends RandomAccessReader
         return checksumBytes.getInt(0);
     }
 
+    public int getTotalBufferSize()
+    {
+        return super.getTotalBufferSize() + compressed.capacity();
+    }
+
     @Override
     public long length()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 4ceb3c4..9a03480 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -152,7 +152,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         return filePath;
     }
 
-    public int getBufferSize()
+    public int getTotalBufferSize()
     {
         return buffer.length;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 6231fd7..d4da177 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.util;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.File;
 import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import java.util.Iterator;
@@ -57,7 +58,7 @@ public abstract class SegmentedFile
 
     protected SegmentedFile(String path, long length, long onDiskLength)
     {
-        this.path = path;
+        this.path = new File(path).getAbsolutePath();
         this.length = length;
         this.onDiskLength = onDiskLength;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java
index e6bc3e5..c939a6f 100644
--- a/src/java/org/apache/cassandra/service/FileCacheService.java
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -22,11 +22,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,37 +41,47 @@ public class FileCacheService
 
     public static FileCacheService instance = new FileCacheService();
 
-    private final Cache<String, Queue<RandomAccessReader>> cache;
-    private final FileCacheMetrics metrics = new FileCacheMetrics();
-    public final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>()
+    private static final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>()
     {
         @Override
-        public Queue<RandomAccessReader> call() throws Exception
+        public Queue<RandomAccessReader> call()
         {
             return new ConcurrentLinkedQueue<RandomAccessReader>();
         }
     };
 
+    private static final AtomicInteger memoryUsage = new AtomicInteger();
+
+    private final Cache<String, Queue<RandomAccessReader>> cache;
+    private final FileCacheMetrics metrics = new FileCacheMetrics();
+
     protected FileCacheService()
     {
+        RemovalListener<String, Queue<RandomAccessReader>> onRemove = new RemovalListener<String, Queue<RandomAccessReader>>()
+        {
+            @Override
+            public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification)
+            {
+                Queue<RandomAccessReader> cachedInstances = notification.getValue();
+                if (cachedInstances == null)
+                    return;
+
+                if (cachedInstances.size() > 0)
+                    logger.debug("Evicting cold readers for {}", cachedInstances.peek().getPath());
+
+                for (RandomAccessReader reader : cachedInstances)
+                {
+                    memoryUsage.addAndGet(-1 * reader.getTotalBufferSize());
+                    reader.deallocate();
+                }
+            }
+        };
+
         cache = CacheBuilder.<String, Queue<RandomAccessReader>>newBuilder()
-                            .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS)
-                            .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
-                            .removalListener(new RemovalListener<String, Queue<RandomAccessReader>>()
-                            {
-                                @Override
-                                public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification)
-                                {
-                                    Queue<RandomAccessReader> cachedInstances = notification.getValue();
-
-                                    if (cachedInstances == null)
-                                        return;
-
-                                    for (RandomAccessReader reader : cachedInstances)
-                                        reader.deallocate();
-                                }
-                            })
-                            .build();
+                .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS)
+                .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
+                .removalListener(onRemove)
+                .build();
     }
 
     public RandomAccessReader get(String path)
@@ -81,12 +89,7 @@ public class FileCacheService
         metrics.requests.mark();
 
         Queue<RandomAccessReader> instances = getCacheFor(path);
-
-        if (instances == null)
-            return null;
-
         RandomAccessReader result = instances.poll();
-
         if (result != null)
             metrics.hits.mark();
 
@@ -101,30 +104,30 @@ public class FileCacheService
         }
         catch (ExecutionException e)
         {
-            // if something bad happened, let's just carry on and return null
-            // as dysfunctional queue should not interrupt normal operation
-            logger.debug("Exception fetching cache", e);
+            throw new AssertionError(e);
         }
-
-        return null;
     }
 
     public void put(RandomAccessReader instance)
     {
-        // This wouldn't be precise sometimes when CRAR is used because
-        // there is a way for users to dynamically change the size of the buffer,
-        // but we don't expect that to happen frequently in production.
-        // Doing accounting this way also allows us to avoid atomic CAS operation on read path.
-        long memoryUsage = (cache.size() + 1) * instance.getBufferSize();
+        int memoryUsed = memoryUsage.get();
+        if (logger.isDebugEnabled())
+            logger.debug("Estimated memory usage is {} compared to actual usage {}", memoryUsed, sizeInBytes());
 
-        if (memoryUsage >= MEMORY_USAGE_THRESHOLD)
+        if (memoryUsed >= MEMORY_USAGE_THRESHOLD)
+        {
             instance.deallocate();
+        }
         else
+        {
+            memoryUsage.addAndGet(instance.getTotalBufferSize());
             getCacheFor(instance.getPath()).add(instance);
+        }
     }
 
     public void invalidate(String path)
     {
+        logger.debug("Invalidating cache for {}", path);
         cache.invalidate(path);
     }
 
@@ -133,7 +136,7 @@ public class FileCacheService
         long n = 0;
         for (Queue<RandomAccessReader> queue : cache.asMap().values())
             for (RandomAccessReader reader : queue)
-                n += reader.getBufferSize();
+                n += reader.getTotalBufferSize();
         return n;
     }
 }