You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/09/17 23:58:09 UTC

[3/6] git commit: add file_cache_size_in_mb setting patch by pyaskevich and jbellis for CASSANDRA-5661

add file_cache_size_in_mb setting
patch by pyaskevich and jbellis for CASSANDRA-5661


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dfe49376
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dfe49376
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dfe49376

Branch: refs/heads/trunk
Commit: dfe49376097cf73c04c0d8506782263a2e820cb0
Parents: 6e2401e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 16:49:11 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 16:53:11 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   4 +
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 ++
 .../cassandra/io/util/PoolingSegmentedFile.java |  14 +-
 .../cassandra/io/util/RandomAccessReader.java   |   5 +
 .../cassandra/metrics/FileCacheMetrics.java     |  64 +++++++++
 .../cassandra/service/FileCacheService.java     | 139 +++++++++++++++++++
 8 files changed, 229 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb4f3f4..e4066ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.10
+ * add file_cache_size_in_mb setting (CASSANDRA-5661)
  * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
  * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
  * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 27ac09b..2916ed9 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -275,6 +275,10 @@ reduce_cache_capacity_to: 0.6
 concurrent_reads: 32
 concurrent_writes: 32
 
+# Total memory to use for sstable-reading buffers.  Defaults to
+# the smaller of 1/4 of heap or 512MB.
+# file_cache_size_in_mb: 512
+
 # Total memory to use for memtables.  Cassandra will flush the largest
 # memtable when this much memory is used.
 # If omitted, Cassandra will set it to 1/3 of the heap.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a924a4c..c5a4aa1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -169,6 +169,8 @@ public class Config
     public String row_cache_provider = SerializingCacheProvider.class.getSimpleName();
     public boolean populate_io_cache_on_flush = false;
 
+    public Integer file_cache_size_in_mb;
+
     public boolean inter_dc_tcp_nodelay = true;
 
     public String memtable_allocator = "SlabAllocator";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8e3cbe2..dbf0905 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -272,6 +272,9 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("concurrent_replicates must be at least 2");
             }
 
+            if (conf.file_cache_size_in_mb == null)
+                conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
+
             if (conf.memtable_total_space_in_mb == null)
                 conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
             if (conf.memtable_total_space_in_mb <= 0)
@@ -1209,6 +1212,11 @@ public class DatabaseDescriptor
         return conf.memtable_flush_queue_size;
     }
 
+    public static int getFileCacheSizeInMB()
+    {
+        return conf.file_cache_size_in_mb;
+    }
+
     public static int getTotalMemtableSpaceInMB()
     {
         // should only be called if estimatesRealMemtableSize() is true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index 4173d5a..892611c 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -17,13 +17,10 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.cassandra.service.FileCacheService;
 
 public abstract class PoolingSegmentedFile extends SegmentedFile
 {
-    public final Queue<RandomAccessReader> pool = new ConcurrentLinkedQueue<RandomAccessReader>();
-
     protected PoolingSegmentedFile(String path, long length)
     {
         super(path, length);
@@ -36,9 +33,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public FileDataInput getSegment(long position)
     {
-        RandomAccessReader reader = pool.poll();
+        RandomAccessReader reader = FileCacheService.instance.get(path);
+
         if (reader == null)
             reader = createReader(path);
+
         reader.seek(position);
         return reader;
     }
@@ -47,12 +46,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public void recycle(RandomAccessReader reader)
     {
-        pool.add(reader);
+        FileCacheService.instance.put(reader);
     }
 
     public void cleanup()
     {
-        for (RandomAccessReader reader : pool)
-            reader.deallocate();
+        FileCacheService.instance.invalidate(path);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 3210372..64c5cf7 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -186,6 +186,11 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         return filePath;
     }
 
+    public int getBufferSize()
+    {
+        return buffer.length;
+    }
+
     public void reset()
     {
         seek(markedPointer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java b/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
new file mode 100644
index 0000000..9b21de6
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.util.RatioGauge;
+import org.apache.cassandra.service.FileCacheService;
+
+public class FileCacheMetrics
+{
+    /** Total number of hits */
+    public final Meter hits;
+    /** Total number of requests */
+    public final Meter requests;
+    /** hit rate */
+    public final Gauge<Double> hitRate;
+    /** Total size of file cache, in bytes */
+    public final Gauge<Long> size;
+
+    public FileCacheMetrics()
+    {
+        hits = Metrics.newMeter(new MetricName(FileCacheService.class, "Hits"), "hits", TimeUnit.SECONDS);
+        requests = Metrics.newMeter(new MetricName(FileCacheService.class, "Requests"), "requests", TimeUnit.SECONDS);
+        hitRate = Metrics.newGauge(new MetricName(FileCacheService.class, "HitRate"), new RatioGauge()
+        {
+            protected double getNumerator()
+            {
+                return hits.count();
+            }
+
+            protected double getDenominator()
+            {
+                return requests.count();
+            }
+        });
+        size = Metrics.newGauge(new MetricName(FileCacheService.class, "Size"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return FileCacheService.instance.sizeInBytes();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java
new file mode 100644
index 0000000..9dd1b15
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.metrics.FileCacheMetrics;
+
+public class FileCacheService
+{
+    private static final Logger logger = LoggerFactory.getLogger(FileCacheService.class);
+
+    private static final long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024 * 1024;
+    private static final int AFTER_ACCESS_EXPIRATION = 512; // in millis
+
+    public static FileCacheService instance = new FileCacheService();
+
+    private final Cache<String, Queue<RandomAccessReader>> cache;
+    private final FileCacheMetrics metrics = new FileCacheMetrics();
+    public final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>()
+    {
+        @Override
+        public Queue<RandomAccessReader> call() throws Exception
+        {
+            return new ConcurrentLinkedQueue<RandomAccessReader>();
+        }
+    };
+
+    protected FileCacheService()
+    {
+        cache = CacheBuilder.<String, Queue<RandomAccessReader>>newBuilder()
+                            .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS)
+                            .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
+                            .removalListener(new RemovalListener<String, Queue<RandomAccessReader>>()
+                            {
+                                @Override
+                                public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification)
+                                {
+                                    Queue<RandomAccessReader> cachedInstances = notification.getValue();
+
+                                    if (cachedInstances == null)
+                                        return;
+
+                                    for (RandomAccessReader reader : cachedInstances)
+                                        reader.deallocate();
+                                }
+                            })
+                            .build();
+    }
+
+    public RandomAccessReader get(String path)
+    {
+        metrics.requests.mark();
+
+        Queue<RandomAccessReader> instances = getCacheFor(path);
+
+        if (instances == null)
+            return null;
+
+        RandomAccessReader result = instances.poll();
+
+        if (result != null)
+            metrics.hits.mark();
+
+        return result;
+    }
+
+    private Queue<RandomAccessReader> getCacheFor(String path)
+    {
+        try
+        {
+            return cache.get(path, cacheForPathCreator);
+        }
+        catch (ExecutionException e)
+        {
+            // if something bad happened, let's just carry on and return null
+            // as dysfunctional queue should not interrupt normal operation
+            logger.debug("Exception fetching cache", e);
+        }
+
+        return null;
+    }
+
+    public void put(RandomAccessReader instance)
+    {
+        // This wouldn't be precise sometimes when CRAR is used because
+        // there is a way for users to dynamically change the size of the buffer,
+        // but we don't expect that to happen frequently in production.
+        // Doing accounting this way also allows us to avoid atomic CAS operation on read path.
+        long memoryUsage = (cache.size() + 1) * instance.getBufferSize();
+
+        if (memoryUsage >= MEMORY_USAGE_THRESHOLD)
+            instance.deallocate();
+        else
+            getCacheFor(instance.getPath()).add(instance);
+    }
+
+    public void invalidate(String path)
+    {
+        cache.invalidate(path);
+    }
+
+    public long sizeInBytes()
+    {
+        long n = 0;
+        for (Queue<RandomAccessReader> queue : cache.asMap().values())
+            for (RandomAccessReader reader : queue)
+                n += reader.getBufferSize();
+        return n;
+    }
+}