You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/07 21:24:14 UTC
[3/5] git commit: Fix FileCacheService regressions patch by jbellis;
reviewed by pyaskevich and tested by Kai Wang for CASSANDRA-6149
Fix FileCacheService regressions
patch by jbellis; reviewed by pyaskevich and tested by Kai Wang for CASSANDRA-6149
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01a57eea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01a57eea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01a57eea
Branch: refs/heads/trunk
Commit: 01a57eea841e51fb4a97329ab9fa0f59d0b826f6
Parents: c374aca
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Oct 7 14:20:42 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Oct 7 14:20:42 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedRandomAccessReader.java | 5 ++
.../cassandra/io/util/RandomAccessReader.java | 2 +-
.../apache/cassandra/io/util/SegmentedFile.java | 3 +-
.../cassandra/service/FileCacheService.java | 87 ++++++++++----------
5 files changed, 54 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 94fa927..ddd976e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.2
+ * Fix FileCacheService regressions (CASSANDRA-6149)
* Never return WriteTimeout for CL.ANY (CASSANDRA-6032)
* Fix race conditions in bulk loader (CASSANDRA-6129)
* Add configurable metrics reporting (CASSANDRA-4430)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index b6cffa2..131a4d6 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -154,6 +154,11 @@ public class CompressedRandomAccessReader extends RandomAccessReader
return checksumBytes.getInt(0);
}
+ public int getTotalBufferSize()
+ {
+ return super.getTotalBufferSize() + compressed.capacity();
+ }
+
@Override
public long length()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 4ceb3c4..9a03480 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -152,7 +152,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
return filePath;
}
- public int getBufferSize()
+ public int getTotalBufferSize()
{
return buffer.length;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 6231fd7..d4da177 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.util;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.File;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.util.Iterator;
@@ -57,7 +58,7 @@ public abstract class SegmentedFile
protected SegmentedFile(String path, long length, long onDiskLength)
{
- this.path = path;
+ this.path = new File(path).getAbsolutePath();
this.length = length;
this.onDiskLength = onDiskLength;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/01a57eea/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java
index e6bc3e5..c939a6f 100644
--- a/src/java/org/apache/cassandra/service/FileCacheService.java
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -22,11 +22,9 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,37 +41,47 @@ public class FileCacheService
public static FileCacheService instance = new FileCacheService();
- private final Cache<String, Queue<RandomAccessReader>> cache;
- private final FileCacheMetrics metrics = new FileCacheMetrics();
- public final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>()
+ private static final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>()
{
@Override
- public Queue<RandomAccessReader> call() throws Exception
+ public Queue<RandomAccessReader> call()
{
return new ConcurrentLinkedQueue<RandomAccessReader>();
}
};
+ private static final AtomicInteger memoryUsage = new AtomicInteger();
+
+ private final Cache<String, Queue<RandomAccessReader>> cache;
+ private final FileCacheMetrics metrics = new FileCacheMetrics();
+
protected FileCacheService()
{
+ RemovalListener<String, Queue<RandomAccessReader>> onRemove = new RemovalListener<String, Queue<RandomAccessReader>>()
+ {
+ @Override
+ public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification)
+ {
+ Queue<RandomAccessReader> cachedInstances = notification.getValue();
+ if (cachedInstances == null)
+ return;
+
+ if (cachedInstances.size() > 0)
+ logger.debug("Evicting cold readers for {}", cachedInstances.peek().getPath());
+
+ for (RandomAccessReader reader : cachedInstances)
+ {
+ memoryUsage.addAndGet(-1 * reader.getTotalBufferSize());
+ reader.deallocate();
+ }
+ }
+ };
+
cache = CacheBuilder.<String, Queue<RandomAccessReader>>newBuilder()
- .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS)
- .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
- .removalListener(new RemovalListener<String, Queue<RandomAccessReader>>()
- {
- @Override
- public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification)
- {
- Queue<RandomAccessReader> cachedInstances = notification.getValue();
-
- if (cachedInstances == null)
- return;
-
- for (RandomAccessReader reader : cachedInstances)
- reader.deallocate();
- }
- })
- .build();
+ .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS)
+ .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
+ .removalListener(onRemove)
+ .build();
}
public RandomAccessReader get(String path)
@@ -81,12 +89,7 @@ public class FileCacheService
metrics.requests.mark();
Queue<RandomAccessReader> instances = getCacheFor(path);
-
- if (instances == null)
- return null;
-
RandomAccessReader result = instances.poll();
-
if (result != null)
metrics.hits.mark();
@@ -101,30 +104,30 @@ public class FileCacheService
}
catch (ExecutionException e)
{
- // if something bad happened, let's just carry on and return null
- // as dysfunctional queue should not interrupt normal operation
- logger.debug("Exception fetching cache", e);
+ throw new AssertionError(e);
}
-
- return null;
}
public void put(RandomAccessReader instance)
{
- // This wouldn't be precise sometimes when CRAR is used because
- // there is a way for users to dynamically change the size of the buffer,
- // but we don't expect that to happen frequently in production.
- // Doing accounting this way also allows us to avoid atomic CAS operation on read path.
- long memoryUsage = (cache.size() + 1) * instance.getBufferSize();
+ int memoryUsed = memoryUsage.get();
+ if (logger.isDebugEnabled())
+ logger.debug("Estimated memory usage is {} compared to actual usage {}", memoryUsed, sizeInBytes());
- if (memoryUsage >= MEMORY_USAGE_THRESHOLD)
+ if (memoryUsed >= MEMORY_USAGE_THRESHOLD)
+ {
instance.deallocate();
+ }
else
+ {
+ memoryUsage.addAndGet(instance.getTotalBufferSize());
getCacheFor(instance.getPath()).add(instance);
+ }
}
public void invalidate(String path)
{
+ logger.debug("Invalidating cache for {}", path);
cache.invalidate(path);
}
@@ -133,7 +136,7 @@ public class FileCacheService
long n = 0;
for (Queue<RandomAccessReader> queue : cache.asMap().values())
for (RandomAccessReader reader : queue)
- n += reader.getBufferSize();
+ n += reader.getTotalBufferSize();
return n;
}
}