You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/09/17 23:58:07 UTC

[1/6] git commit: add file_cache_size_in_mb setting patch by pyaskevich and jbellis for CASSANDRA-5661

Updated Branches:
  refs/heads/cassandra-1.2 6e2401e00 -> dfe493760
  refs/heads/cassandra-2.0 cfa097cdd -> 1e0d9513b
  refs/heads/trunk 3700e0121 -> 494ba4582


add file_cache_size_in_mb setting
patch by pyaskevich and jbellis for CASSANDRA-5661


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dfe49376
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dfe49376
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dfe49376

Branch: refs/heads/cassandra-1.2
Commit: dfe49376097cf73c04c0d8506782263a2e820cb0
Parents: 6e2401e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 16:49:11 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 16:53:11 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   4 +
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 ++
 .../cassandra/io/util/PoolingSegmentedFile.java |  14 +-
 .../cassandra/io/util/RandomAccessReader.java   |   5 +
 .../cassandra/metrics/FileCacheMetrics.java     |  64 +++++++++
 .../cassandra/service/FileCacheService.java     | 139 +++++++++++++++++++
 8 files changed, 229 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb4f3f4..e4066ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.10
+ * add file_cache_size_in_mb setting (CASSANDRA-5661)
  * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
  * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
  * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 27ac09b..2916ed9 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -275,6 +275,10 @@ reduce_cache_capacity_to: 0.6
 concurrent_reads: 32
 concurrent_writes: 32
 
+# Total memory to use for sstable-reading buffers.  Defaults to
+# the smaller of 1/4 of heap or 512MB.
+# file_cache_size_in_mb: 512
+
 # Total memory to use for memtables.  Cassandra will flush the largest
 # memtable when this much memory is used.
 # If omitted, Cassandra will set it to 1/3 of the heap.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a924a4c..c5a4aa1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -169,6 +169,8 @@ public class Config
     public String row_cache_provider = SerializingCacheProvider.class.getSimpleName();
     public boolean populate_io_cache_on_flush = false;
 
+    public Integer file_cache_size_in_mb;
+
     public boolean inter_dc_tcp_nodelay = true;
 
     public String memtable_allocator = "SlabAllocator";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8e3cbe2..dbf0905 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -272,6 +272,9 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("concurrent_replicates must be at least 2");
             }
 
+            if (conf.file_cache_size_in_mb == null)
+                conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
+
             if (conf.memtable_total_space_in_mb == null)
                 conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
             if (conf.memtable_total_space_in_mb <= 0)
@@ -1209,6 +1212,11 @@ public class DatabaseDescriptor
         return conf.memtable_flush_queue_size;
     }
 
+    public static int getFileCacheSizeInMB()
+    {
+        return conf.file_cache_size_in_mb;
+    }
+
     public static int getTotalMemtableSpaceInMB()
     {
         // should only be called if estimatesRealMemtableSize() is true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index 4173d5a..892611c 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -17,13 +17,10 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.cassandra.service.FileCacheService;
 
 public abstract class PoolingSegmentedFile extends SegmentedFile
 {
-    public final Queue<RandomAccessReader> pool = new ConcurrentLinkedQueue<RandomAccessReader>();
-
     protected PoolingSegmentedFile(String path, long length)
     {
         super(path, length);
@@ -36,9 +33,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public FileDataInput getSegment(long position)
     {
-        RandomAccessReader reader = pool.poll();
+        RandomAccessReader reader = FileCacheService.instance.get(path);
+
         if (reader == null)
             reader = createReader(path);
+
         reader.seek(position);
         return reader;
     }
@@ -47,12 +46,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public void recycle(RandomAccessReader reader)
     {
-        pool.add(reader);
+        FileCacheService.instance.put(reader);
     }
 
     public void cleanup()
     {
-        for (RandomAccessReader reader : pool)
-            reader.deallocate();
+        FileCacheService.instance.invalidate(path);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 3210372..64c5cf7 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -186,6 +186,11 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         return filePath;
     }
 
+    public int getBufferSize()
+    {
+        return buffer.length;
+    }
+
     public void reset()
     {
         seek(markedPointer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java b/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
new file mode 100644
index 0000000..9b21de6
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.util.RatioGauge;
+import org.apache.cassandra.service.FileCacheService;
+
+public class FileCacheMetrics
+{
+    /** Total number of hits */
+    public final Meter hits;
+    /** Total number of requests */
+    public final Meter requests;
+    /** hit rate */
+    public final Gauge<Double> hitRate;
+    /** Total size of file cache, in bytes */
+    public final Gauge<Long> size;
+
+    public FileCacheMetrics()
+    {
+        hits = Metrics.newMeter(new MetricName(FileCacheService.class, "Hits"), "hits", TimeUnit.SECONDS);
+        requests = Metrics.newMeter(new MetricName(FileCacheService.class, "Requests"), "requests", TimeUnit.SECONDS);
+        hitRate = Metrics.newGauge(new MetricName(FileCacheService.class, "HitRate"), new RatioGauge()
+        {
+            protected double getNumerator()
+            {
+                return hits.count();
+            }
+
+            protected double getDenominator()
+            {
+                return requests.count();
+            }
+        });
+        size = Metrics.newGauge(new MetricName(FileCacheService.class, "Size"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return FileCacheService.instance.sizeInBytes();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java
new file mode 100644
index 0000000..9dd1b15
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.metrics.FileCacheMetrics;
+
+public class FileCacheService
+{
+    private static final Logger logger = LoggerFactory.getLogger(FileCacheService.class);
+
+    private static final long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024 * 1024;
+    private static final int AFTER_ACCESS_EXPIRATION = 512; // in millis
+
+    public static FileCacheService instance = new FileCacheService();
+
+    private final Cache<String, Queue<RandomAccessReader>> cache;
+    private final FileCacheMetrics metrics = new FileCacheMetrics();
+    public final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>()
+    {
+        @Override
+        public Queue<RandomAccessReader> call() throws Exception
+        {
+            return new ConcurrentLinkedQueue<RandomAccessReader>();
+        }
+    };
+
+    protected FileCacheService()
+    {
+        cache = CacheBuilder.<String, Queue<RandomAccessReader>>newBuilder()
+                            .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS)
+                            .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
+                            .removalListener(new RemovalListener<String, Queue<RandomAccessReader>>()
+                            {
+                                @Override
+                                public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification)
+                                {
+                                    Queue<RandomAccessReader> cachedInstances = notification.getValue();
+
+                                    if (cachedInstances == null)
+                                        return;
+
+                                    for (RandomAccessReader reader : cachedInstances)
+                                        reader.deallocate();
+                                }
+                            })
+                            .build();
+    }
+
+    public RandomAccessReader get(String path)
+    {
+        metrics.requests.mark();
+
+        Queue<RandomAccessReader> instances = getCacheFor(path);
+
+        if (instances == null)
+            return null;
+
+        RandomAccessReader result = instances.poll();
+
+        if (result != null)
+            metrics.hits.mark();
+
+        return result;
+    }
+
+    private Queue<RandomAccessReader> getCacheFor(String path)
+    {
+        try
+        {
+            return cache.get(path, cacheForPathCreator);
+        }
+        catch (ExecutionException e)
+        {
+            // if something bad happened, let's just carry on and return null
+            // as dysfunctional queue should not interrupt normal operation
+            logger.debug("Exception fetching cache", e);
+        }
+
+        return null;
+    }
+
+    public void put(RandomAccessReader instance)
+    {
+        // This wouldn't be precise sometimes when CRAR is used because
+        // there is a way for users to dynamically change the size of the buffer,
+        // but we don't expect that to happen frequently in production.
+        // Doing accounting this way also allows us to avoid atomic CAS operation on read path.
+        long memoryUsage = (cache.size() + 1) * instance.getBufferSize();
+
+        if (memoryUsage >= MEMORY_USAGE_THRESHOLD)
+            instance.deallocate();
+        else
+            getCacheFor(instance.getPath()).add(instance);
+    }
+
+    public void invalidate(String path)
+    {
+        cache.invalidate(path);
+    }
+
+    public long sizeInBytes()
+    {
+        long n = 0;
+        for (Queue<RandomAccessReader> queue : cache.asMap().values())
+            for (RandomAccessReader reader : queue)
+                n += reader.getBufferSize();
+        return n;
+    }
+}


[6/6] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/494ba458
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/494ba458
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/494ba458

Branch: refs/heads/trunk
Commit: 494ba4582b9be1a391f9bcd7ac0924c3d0dcb76e
Parents: 3700e01 1e0d951
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 16:57:59 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 16:57:59 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   4 +
 .../org/apache/cassandra/config/Config.java     |   6 +-
 .../cassandra/config/DatabaseDescriptor.java    |   8 ++
 .../cassandra/io/util/PoolingSegmentedFile.java |  14 +-
 .../cassandra/io/util/RandomAccessReader.java   |   5 +
 .../cassandra/metrics/FileCacheMetrics.java     |  64 +++++++++
 .../cassandra/service/FileCacheService.java     | 139 +++++++++++++++++++
 8 files changed, 231 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/494ba458/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/494ba458/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------


[2/6] git commit: add file_cache_size_in_mb setting patch by pyaskevich and jbellis for CASSANDRA-5661

Posted by jb...@apache.org.
add file_cache_size_in_mb setting
patch by pyaskevich and jbellis for CASSANDRA-5661


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dfe49376
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dfe49376
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dfe49376

Branch: refs/heads/cassandra-2.0
Commit: dfe49376097cf73c04c0d8506782263a2e820cb0
Parents: 6e2401e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 16:49:11 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 16:53:11 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   4 +
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 ++
 .../cassandra/io/util/PoolingSegmentedFile.java |  14 +-
 .../cassandra/io/util/RandomAccessReader.java   |   5 +
 .../cassandra/metrics/FileCacheMetrics.java     |  64 +++++++++
 .../cassandra/service/FileCacheService.java     | 139 +++++++++++++++++++
 8 files changed, 229 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb4f3f4..e4066ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.10
+ * add file_cache_size_in_mb setting (CASSANDRA-5661)
  * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
  * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
  * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 27ac09b..2916ed9 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -275,6 +275,10 @@ reduce_cache_capacity_to: 0.6
 concurrent_reads: 32
 concurrent_writes: 32
 
+# Total memory to use for sstable-reading buffers.  Defaults to
+# the smaller of 1/4 of heap or 512MB.
+# file_cache_size_in_mb: 512
+
 # Total memory to use for memtables.  Cassandra will flush the largest
 # memtable when this much memory is used.
 # If omitted, Cassandra will set it to 1/3 of the heap.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a924a4c..c5a4aa1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -169,6 +169,8 @@ public class Config
     public String row_cache_provider = SerializingCacheProvider.class.getSimpleName();
     public boolean populate_io_cache_on_flush = false;
 
+    public Integer file_cache_size_in_mb;
+
     public boolean inter_dc_tcp_nodelay = true;
 
     public String memtable_allocator = "SlabAllocator";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8e3cbe2..dbf0905 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -272,6 +272,9 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("concurrent_replicates must be at least 2");
             }
 
+            if (conf.file_cache_size_in_mb == null)
+                conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
+
             if (conf.memtable_total_space_in_mb == null)
                 conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
             if (conf.memtable_total_space_in_mb <= 0)
@@ -1209,6 +1212,11 @@ public class DatabaseDescriptor
         return conf.memtable_flush_queue_size;
     }
 
+    public static int getFileCacheSizeInMB()
+    {
+        return conf.file_cache_size_in_mb;
+    }
+
     public static int getTotalMemtableSpaceInMB()
     {
         // should only be called if estimatesRealMemtableSize() is true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index 4173d5a..892611c 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -17,13 +17,10 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.cassandra.service.FileCacheService;
 
 public abstract class PoolingSegmentedFile extends SegmentedFile
 {
-    public final Queue<RandomAccessReader> pool = new ConcurrentLinkedQueue<RandomAccessReader>();
-
     protected PoolingSegmentedFile(String path, long length)
     {
         super(path, length);
@@ -36,9 +33,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public FileDataInput getSegment(long position)
     {
-        RandomAccessReader reader = pool.poll();
+        RandomAccessReader reader = FileCacheService.instance.get(path);
+
         if (reader == null)
             reader = createReader(path);
+
         reader.seek(position);
         return reader;
     }
@@ -47,12 +46,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public void recycle(RandomAccessReader reader)
     {
-        pool.add(reader);
+        FileCacheService.instance.put(reader);
     }
 
     public void cleanup()
     {
-        for (RandomAccessReader reader : pool)
-            reader.deallocate();
+        FileCacheService.instance.invalidate(path);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 3210372..64c5cf7 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -186,6 +186,11 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         return filePath;
     }
 
+    public int getBufferSize()
+    {
+        return buffer.length;
+    }
+
     public void reset()
     {
         seek(markedPointer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java b/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
new file mode 100644
index 0000000..9b21de6
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.util.RatioGauge;
+import org.apache.cassandra.service.FileCacheService;
+
+public class FileCacheMetrics
+{
+    /** Total number of hits */
+    public final Meter hits;
+    /** Total number of requests */
+    public final Meter requests;
+    /** hit rate */
+    public final Gauge<Double> hitRate;
+    /** Total size of file cache, in bytes */
+    public final Gauge<Long> size;
+
+    public FileCacheMetrics()
+    {
+        hits = Metrics.newMeter(new MetricName(FileCacheService.class, "Hits"), "hits", TimeUnit.SECONDS);
+        requests = Metrics.newMeter(new MetricName(FileCacheService.class, "Requests"), "requests", TimeUnit.SECONDS);
+        hitRate = Metrics.newGauge(new MetricName(FileCacheService.class, "HitRate"), new RatioGauge()
+        {
+            protected double getNumerator()
+            {
+                return hits.count();
+            }
+
+            protected double getDenominator()
+            {
+                return requests.count();
+            }
+        });
+        size = Metrics.newGauge(new MetricName(FileCacheService.class, "Size"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return FileCacheService.instance.sizeInBytes();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java
new file mode 100644
index 0000000..9dd1b15
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.metrics.FileCacheMetrics;
+
+public class FileCacheService
+{
+    private static final Logger logger = LoggerFactory.getLogger(FileCacheService.class);
+
+    private static final long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024 * 1024;
+    private static final int AFTER_ACCESS_EXPIRATION = 512; // in millis
+
+    public static FileCacheService instance = new FileCacheService();
+
+    private final Cache<String, Queue<RandomAccessReader>> cache;
+    private final FileCacheMetrics metrics = new FileCacheMetrics();
+    public final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>()
+    {
+        @Override
+        public Queue<RandomAccessReader> call() throws Exception
+        {
+            return new ConcurrentLinkedQueue<RandomAccessReader>();
+        }
+    };
+
+    protected FileCacheService()
+    {
+        cache = CacheBuilder.<String, Queue<RandomAccessReader>>newBuilder()
+                            .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS)
+                            .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
+                            .removalListener(new RemovalListener<String, Queue<RandomAccessReader>>()
+                            {
+                                @Override
+                                public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification)
+                                {
+                                    Queue<RandomAccessReader> cachedInstances = notification.getValue();
+
+                                    if (cachedInstances == null)
+                                        return;
+
+                                    for (RandomAccessReader reader : cachedInstances)
+                                        reader.deallocate();
+                                }
+                            })
+                            .build();
+    }
+
+    public RandomAccessReader get(String path)
+    {
+        metrics.requests.mark();
+
+        Queue<RandomAccessReader> instances = getCacheFor(path);
+
+        if (instances == null)
+            return null;
+
+        RandomAccessReader result = instances.poll();
+
+        if (result != null)
+            metrics.hits.mark();
+
+        return result;
+    }
+
+    private Queue<RandomAccessReader> getCacheFor(String path)
+    {
+        try
+        {
+            return cache.get(path, cacheForPathCreator);
+        }
+        catch (ExecutionException e)
+        {
+            // if something bad happened, let's just carry on and return null
+            // as dysfunctional queue should not interrupt normal operation
+            logger.debug("Exception fetching cache", e);
+        }
+
+        return null;
+    }
+
+    public void put(RandomAccessReader instance)
+    {
+        // This wouldn't be precise sometimes when CRAR is used because
+        // there is a way for users to dynamically change the size of the buffer,
+        // but we don't expect that to happen frequently in production.
+        // Doing accounting this way also allows us to avoid atomic CAS operation on read path.
+        long memoryUsage = (cache.size() + 1) * instance.getBufferSize();
+
+        if (memoryUsage >= MEMORY_USAGE_THRESHOLD)
+            instance.deallocate();
+        else
+            getCacheFor(instance.getPath()).add(instance);
+    }
+
+    public void invalidate(String path)
+    {
+        cache.invalidate(path);
+    }
+
+    public long sizeInBytes()
+    {
+        long n = 0;
+        for (Queue<RandomAccessReader> queue : cache.asMap().values())
+            for (RandomAccessReader reader : queue)
+                n += reader.getBufferSize();
+        return n;
+    }
+}


[4/6] git commit: merge from 1.2

Posted by jb...@apache.org.
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e0d9513
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e0d9513
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e0d9513

Branch: refs/heads/trunk
Commit: 1e0d9513b748fae4ec0737283da71c65e9272102
Parents: cfa097c dfe4937
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 16:57:52 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 16:57:52 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   4 +
 .../org/apache/cassandra/config/Config.java     |   6 +-
 .../cassandra/config/DatabaseDescriptor.java    |   8 ++
 .../cassandra/io/util/PoolingSegmentedFile.java |  14 +-
 .../cassandra/io/util/RandomAccessReader.java   |   5 +
 .../cassandra/metrics/FileCacheMetrics.java     |  64 +++++++++
 .../cassandra/service/FileCacheService.java     | 139 +++++++++++++++++++
 8 files changed, 231 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e0d9513/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2192fe9,e4066ab..c2a55df
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,25 -1,5 +1,26 @@@
 -1.2.10
 +2.0.1
 + * Improve error message when yaml contains invalid properties (CASSANDRA-5958)
 + * Improve leveled compaction's ability to find non-overlapping L0 compactions
 +   to work on concurrently (CASSANDRA-5921)
 + * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
 + * Log Merkle tree stats (CASSANDRA-2698)
 + * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
 + * Improve offheap memcpy performance (CASSANDRA-5884)
 + * Use a range aware scanner for cleanup (CASSANDRA-2524)
 + * Cleanup doesn't need to inspect sstables that contain only local data 
 +   (CASSANDRA-5722)
 + * Add ability for CQL3 to list partition keys (CASSANDRA-4536)
 + * Improve native protocol serialization (CASSANDRA-5664)
 + * Upgrade Thrift to 0.9.1 (CASSANDRA-5923)
 + * Require superuser status for adding triggers (CASSANDRA-5963)
 + * Make standalone scrubber handle old and new style leveled manifest
 +   (CASSANDRA-6005)
 + * Fix paxos bugs (CASSANDRA-6012, 6013, 6023)
 + * Fix paged ranges with multiple replicas (CASSANDRA-6004)
 + * Fix potential AssertionError during tracing (CASSANDRA-6041)
 + * Fix NPE in sstablesplit (CASSANDRA-6027)
 +Merged from 1.2:
+  * add file_cache_size_in_mb setting (CASSANDRA-5661)
   * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
   * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
   * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e0d9513/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e0d9513/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index dd0728c,c5a4aa1..0414269
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -164,17 -166,16 +164,19 @@@ public class Confi
      public long row_cache_size_in_mb = 0;
      public volatile int row_cache_save_period = 0;
      public int row_cache_keys_to_save = Integer.MAX_VALUE;
 -    public String row_cache_provider = SerializingCacheProvider.class.getSimpleName();
 +    public String memory_allocator = NativeAllocator.class.getSimpleName();
      public boolean populate_io_cache_on_flush = false;
  
-     public boolean inter_dc_tcp_nodelay = false;
- 
 +    private static boolean isClientMode = false;
 +
 +    public boolean preheat_kernel_page_cache = false;
 +
+     public Integer file_cache_size_in_mb;
+ 
+     public boolean inter_dc_tcp_nodelay = true;
+ 
      public String memtable_allocator = "SlabAllocator";
  
 -    private static boolean loadYaml = true;
      private static boolean outboundBindAny = false;
  
      public static boolean getOutboundBindAny()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e0d9513/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c2f3fa6,dbf0905..b3eeca6
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -112,372 -111,416 +112,375 @@@ public class DatabaseDescripto
          }
          catch (Exception e)
          {
 -            ClassLoader loader = DatabaseDescriptor.class.getClassLoader();
 -            url = loader.getResource(configUrl);
 -            if (url == null)
 -                throw new ConfigurationException("Cannot locate " + configUrl);
 +            logger.error("Fatal error during configuration loading", e);
 +            System.err.println(e.getMessage() + "\nFatal error during configuration loading; unable to start. See log for stacktrace.");
 +            System.exit(1);
          }
 -
 -        return url;
      }
  
 -    static
 +    @VisibleForTesting
 +    static Config loadConfig() throws ConfigurationException
      {
 -        if (Config.getLoadYaml())
 -            loadYaml();
 -        else
 -            conf = new Config();
 +        String loaderClass = System.getProperty("cassandra.config.loader");
 +        ConfigurationLoader loader = loaderClass == null
 +                                   ? new YamlConfigurationLoader()
 +                                   : FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading");
 +        return loader.loadConfig();
      }
 -    static void loadYaml()
 +
 +    private static void applyConfig(Config config) throws ConfigurationException
      {
 -        try
 -        {
 -            URL url = getStorageConfigURL();
 -            logger.info("Loading settings from " + url);
 -            InputStream input;
 -            try
 -            {
 -                input = url.openStream();
 -            }
 -            catch (IOException e)
 -            {
 -                // getStorageConfigURL should have ruled this out
 -                throw new AssertionError(e);
 -            }
 -            org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
 -            TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
 -            seedDesc.putMapPropertyType("parameters", String.class, String.class);
 -            constructor.addTypeDescription(seedDesc);
 -            Yaml yaml = new Yaml(new Loader(constructor));
 -            conf = (Config)yaml.load(input);
 +        conf = config;
  
 -            logger.info("Data files directories: " + Arrays.toString(conf.data_file_directories));
 -            logger.info("Commit log directory: " + conf.commitlog_directory);
 +        logger.info("Data files directories: " + Arrays.toString(conf.data_file_directories));
 +        logger.info("Commit log directory: " + conf.commitlog_directory);
  
 -            if (conf.commitlog_sync == null)
 -            {
 -                throw new ConfigurationException("Missing required directive CommitLogSync");
 -            }
 +        if (conf.commitlog_sync == null)
 +        {
 +            throw new ConfigurationException("Missing required directive CommitLogSync");
 +        }
  
 -            if (conf.commitlog_sync == Config.CommitLogSync.batch)
 -            {
 -                if (conf.commitlog_sync_batch_window_in_ms == null)
 -                {
 -                    throw new ConfigurationException("Missing value for commitlog_sync_batch_window_in_ms: Double expected.");
 -                }
 -                else if (conf.commitlog_sync_period_in_ms != null)
 -                {
 -                    throw new ConfigurationException("Batch sync specified, but commitlog_sync_period_in_ms found. Only specify commitlog_sync_batch_window_in_ms when using batch sync");
 -                }
 -                logger.debug("Syncing log with a batch window of " + conf.commitlog_sync_batch_window_in_ms);
 -            }
 -            else
 +        if (conf.commitlog_sync == Config.CommitLogSync.batch)
 +        {
 +            if (conf.commitlog_sync_batch_window_in_ms == null)
              {
 -                if (conf.commitlog_sync_period_in_ms == null)
 -                {
 -                    throw new ConfigurationException("Missing value for commitlog_sync_period_in_ms: Integer expected");
 -                }
 -                else if (conf.commitlog_sync_batch_window_in_ms != null)
 -                {
 -                    throw new ConfigurationException("commitlog_sync_period_in_ms specified, but commitlog_sync_batch_window_in_ms found.  Only specify commitlog_sync_period_in_ms when using periodic sync.");
 -                }
 -                logger.debug("Syncing log with a period of " + conf.commitlog_sync_period_in_ms);
 +                throw new ConfigurationException("Missing value for commitlog_sync_batch_window_in_ms: Double expected.");
              }
 -
 -            if (conf.commitlog_total_space_in_mb == null)
 -                conf.commitlog_total_space_in_mb = System.getProperty("os.arch").contains("64") ? 1024 : 32;
 -
 -            /* evaluate the DiskAccessMode Config directive, which also affects indexAccessMode selection */
 -            if (conf.disk_access_mode == Config.DiskAccessMode.auto)
 +            else if (conf.commitlog_sync_period_in_ms != null)
              {
 -                conf.disk_access_mode = System.getProperty("os.arch").contains("64") ? Config.DiskAccessMode.mmap : Config.DiskAccessMode.standard;
 -                indexAccessMode = conf.disk_access_mode;
 -                logger.info("DiskAccessMode 'auto' determined to be " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +                throw new ConfigurationException("Batch sync specified, but commitlog_sync_period_in_ms found. Only specify commitlog_sync_batch_window_in_ms when using batch sync");
              }
 -            else if (conf.disk_access_mode == Config.DiskAccessMode.mmap_index_only)
 +            logger.debug("Syncing log with a batch window of " + conf.commitlog_sync_batch_window_in_ms);
 +        }
 +        else
 +        {
 +            if (conf.commitlog_sync_period_in_ms == null)
              {
 -                conf.disk_access_mode = Config.DiskAccessMode.standard;
 -                indexAccessMode = Config.DiskAccessMode.mmap;
 -                logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +                throw new ConfigurationException("Missing value for commitlog_sync_period_in_ms: Integer expected");
              }
 -            else
 +            else if (conf.commitlog_sync_batch_window_in_ms != null)
              {
 -                indexAccessMode = conf.disk_access_mode;
 -                logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +                throw new ConfigurationException("commitlog_sync_period_in_ms specified, but commitlog_sync_batch_window_in_ms found.  Only specify commitlog_sync_period_in_ms when using periodic sync.");
              }
 +            logger.debug("Syncing log with a period of " + conf.commitlog_sync_period_in_ms);
 +        }
  
 -            logger.info("disk_failure_policy is " + conf.disk_failure_policy);
 +        if (conf.commitlog_total_space_in_mb == null)
 +            conf.commitlog_total_space_in_mb = System.getProperty("os.arch").contains("64") ? 1024 : 32;
  
 -            /* Authentication and authorization backend, implementing IAuthenticator and IAuthorizer */
 -            if (conf.authenticator != null)
 -                authenticator = FBUtilities.newAuthenticator(conf.authenticator);
 +        /* evaluate the DiskAccessMode Config directive, which also affects indexAccessMode selection */
 +        if (conf.disk_access_mode == Config.DiskAccessMode.auto)
 +        {
 +            conf.disk_access_mode = System.getProperty("os.arch").contains("64") ? Config.DiskAccessMode.mmap : Config.DiskAccessMode.standard;
 +            indexAccessMode = conf.disk_access_mode;
 +            logger.info("DiskAccessMode 'auto' determined to be " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +        }
 +        else if (conf.disk_access_mode == Config.DiskAccessMode.mmap_index_only)
 +        {
 +            conf.disk_access_mode = Config.DiskAccessMode.standard;
 +            indexAccessMode = Config.DiskAccessMode.mmap;
 +            logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +        }
 +        else
 +        {
 +            indexAccessMode = conf.disk_access_mode;
 +            logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +        }
  
 -            if (conf.authority != null)
 -            {
 -                logger.warn("Please rename 'authority' to 'authorizer' in cassandra.yaml");
 -                if (!conf.authority.equals("org.apache.cassandra.auth.AllowAllAuthority"))
 -                    throw new ConfigurationException("IAuthority interface has been deprecated,"
 -                                                     + " please implement IAuthorizer instead.");
 -            }
 +        logger.info("disk_failure_policy is " + conf.disk_failure_policy);
  
 -            if (conf.authorizer != null)
 -                authorizer = FBUtilities.newAuthorizer(conf.authorizer);
 +        /* Authentication and authorization backend, implementing IAuthenticator and IAuthorizer */
 +        if (conf.authenticator != null)
 +            authenticator = FBUtilities.newAuthenticator(conf.authenticator);
  
 -            if (authenticator instanceof AllowAllAuthenticator && !(authorizer instanceof AllowAllAuthorizer))
 -                throw new ConfigurationException("AllowAllAuthenticator can't be used with " +  conf.authorizer);
 +        if (conf.authorizer != null)
 +            authorizer = FBUtilities.newAuthorizer(conf.authorizer);
  
 -            if (conf.internode_authenticator != null)
 -                internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator");
 -            else
 -                internodeAuthenticator = new AllowAllInternodeAuthenticator();
 +        if (authenticator instanceof AllowAllAuthenticator && !(authorizer instanceof AllowAllAuthorizer))
 +            throw new ConfigurationException("AllowAllAuthenticator can't be used with " +  conf.authorizer);
  
 -            authenticator.validateConfiguration();
 -            authorizer.validateConfiguration();
 -            internodeAuthenticator.validateConfiguration();
 +        if (conf.internode_authenticator != null)
 +            internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator");
 +        else
 +            internodeAuthenticator = new AllowAllInternodeAuthenticator();
  
 -            /* Hashing strategy */
 -            if (conf.partitioner == null)
 -            {
 -                throw new ConfigurationException("Missing directive: partitioner");
 -            }
 +        authenticator.validateConfiguration();
 +        authorizer.validateConfiguration();
 +        internodeAuthenticator.validateConfiguration();
  
 -            try
 -            {
 -                partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner));
 -            }
 -            catch (Exception e)
 -            {
 -                throw new ConfigurationException("Invalid partitioner class " + conf.partitioner);
 -            }
 -            paritionerName = partitioner.getClass().getCanonicalName();
 +        /* Hashing strategy */
 +        if (conf.partitioner == null)
 +        {
 +            throw new ConfigurationException("Missing directive: partitioner");
 +        }
 +        try
 +        {
 +            partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner));
 +        }
 +        catch (Exception e)
 +        {
 +            throw new ConfigurationException("Invalid partitioner class " + conf.partitioner);
 +        }
 +        paritionerName = partitioner.getClass().getCanonicalName();
  
 -            /* phi convict threshold for FailureDetector */
 -            if (conf.phi_convict_threshold < 5 || conf.phi_convict_threshold > 16)
 -            {
 -                throw new ConfigurationException("phi_convict_threshold must be between 5 and 16");
 -            }
 +        /* phi convict threshold for FailureDetector */
 +        if (conf.phi_convict_threshold < 5 || conf.phi_convict_threshold > 16)
 +        {
 +            throw new ConfigurationException("phi_convict_threshold must be between 5 and 16");
 +        }
  
 -            /* Thread per pool */
 -            if (conf.concurrent_reads != null && conf.concurrent_reads < 2)
 -            {
 -                throw new ConfigurationException("concurrent_reads must be at least 2");
 -            }
 +        /* Thread per pool */
 +        if (conf.concurrent_reads != null && conf.concurrent_reads < 2)
 +        {
 +            throw new ConfigurationException("concurrent_reads must be at least 2");
 +        }
  
 -            if (conf.concurrent_writes != null && conf.concurrent_writes < 2)
 -            {
 -                throw new ConfigurationException("concurrent_writes must be at least 2");
 -            }
 +        if (conf.concurrent_writes != null && conf.concurrent_writes < 2)
 +        {
 +            throw new ConfigurationException("concurrent_writes must be at least 2");
 +        }
  
 -            if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2)
 -            {
 -                throw new ConfigurationException("concurrent_replicates must be at least 2");
 -            }
 +        if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2)
 +        {
 +            throw new ConfigurationException("concurrent_replicates must be at least 2");
 +        }
 +
++        if (conf.file_cache_size_in_mb == null)
++            conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
+ 
 -            if (conf.file_cache_size_in_mb == null)
 -                conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
 +        if (conf.memtable_total_space_in_mb == null)
 +            conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
 +        if (conf.memtable_total_space_in_mb <= 0)
 +            throw new ConfigurationException("memtable_total_space_in_mb must be positive");
 +        logger.info("Global memtable threshold is enabled at {}MB", conf.memtable_total_space_in_mb);
  
 -            if (conf.memtable_total_space_in_mb == null)
 -                conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
 -            if (conf.memtable_total_space_in_mb <= 0)
 -                throw new ConfigurationException("memtable_total_space_in_mb must be positive");
 -            logger.info("Global memtable threshold is enabled at {}MB", conf.memtable_total_space_in_mb);
 +        /* Memtable flush writer threads */
 +        if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1)
 +        {
 +            throw new ConfigurationException("memtable_flush_writers must be at least 1");
 +        }
 +        else if (conf.memtable_flush_writers == null)
 +        {
 +            conf.memtable_flush_writers = conf.data_file_directories.length;
 +        }
  
 -            /* Memtable flush writer threads */
 -            if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1)
 +        /* Local IP or hostname to bind services to */
 +        if (conf.listen_address != null)
 +        {
 +            try
              {
 -                throw new ConfigurationException("memtable_flush_writers must be at least 1");
 +                listenAddress = InetAddress.getByName(conf.listen_address);
              }
 -            else if (conf.memtable_flush_writers == null)
 +            catch (UnknownHostException e)
              {
 -                conf.memtable_flush_writers = conf.data_file_directories.length;
 +                throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'");
              }
 +        }
  
 -            /* Local IP or hostname to bind services to */
 -            if (conf.listen_address != null)
 +        /* Gossip Address to broadcast */
 +        if (conf.broadcast_address != null)
 +        {
 +            if (conf.broadcast_address.equals("0.0.0.0"))
              {
 -                try
 -                {
 -                    listenAddress = InetAddress.getByName(conf.listen_address);
 -                }
 -                catch (UnknownHostException e)
 -                {
 -                    throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'");
 -                }
 +                throw new ConfigurationException("broadcast_address cannot be 0.0.0.0!");
              }
  
 -            /* Gossip Address to broadcast */
 -            if (conf.broadcast_address != null)
 +            try
              {
 -                if (conf.broadcast_address.equals("0.0.0.0"))
 -                {
 -                    throw new ConfigurationException("broadcast_address cannot be 0.0.0.0!");
 -                }
 -
 -                try
 -                {
 -                    broadcastAddress = InetAddress.getByName(conf.broadcast_address);
 -                }
 -                catch (UnknownHostException e)
 -                {
 -                    throw new ConfigurationException("Unknown broadcast_address '" + conf.broadcast_address + "'");
 -                }
 +                broadcastAddress = InetAddress.getByName(conf.broadcast_address);
              }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown broadcast_address '" + conf.broadcast_address + "'");
 +            }
 +        }
  
 -            /* Local IP or hostname to bind RPC server to */
 -            if (conf.rpc_address != null)
 +        /* Local IP or hostname to bind RPC server to */
 +        if (conf.rpc_address != null)
 +        {
 +            try
              {
 -                try
 -                {
 -                    rpcAddress = InetAddress.getByName(conf.rpc_address);
 -                }
 -                catch (UnknownHostException e)
 -                {
 -                    throw new ConfigurationException("Unknown host in rpc_address " + conf.rpc_address);
 -                }
 +                rpcAddress = InetAddress.getByName(conf.rpc_address);
              }
 -            else
 +            catch (UnknownHostException e)
              {
 -                rpcAddress = FBUtilities.getLocalAddress();
 +                throw new ConfigurationException("Unknown host in rpc_address " + conf.rpc_address);
              }
 +        }
 +        else
 +        {
 +            rpcAddress = FBUtilities.getLocalAddress();
 +        }
  
 -            if (conf.thrift_framed_transport_size_in_mb <= 0)
 -                throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive");
 +        if (conf.thrift_framed_transport_size_in_mb <= 0)
 +            throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive");
  
 -            /* end point snitch */
 -            if (conf.endpoint_snitch == null)
 -            {
 -                throw new ConfigurationException("Missing endpoint_snitch directive");
 -            }
 -            snitch = createEndpointSnitch(conf.endpoint_snitch);
 -            EndpointSnitchInfo.create();
 +        /* end point snitch */
 +        if (conf.endpoint_snitch == null)
 +        {
 +            throw new ConfigurationException("Missing endpoint_snitch directive");
 +        }
 +        snitch = createEndpointSnitch(conf.endpoint_snitch);
 +        EndpointSnitchInfo.create();
  
 -            localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 -            localComparator = new Comparator<InetAddress>()
 +        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 +        localComparator = new Comparator<InetAddress>()
 +        {
 +            public int compare(InetAddress endpoint1, InetAddress endpoint2)
              {
 -                public int compare(InetAddress endpoint1, InetAddress endpoint2)
 -                {
 -                    boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
 -                    boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
 -                    if (local1 && !local2)
 -                        return -1;
 -                    if (local2 && !local1)
 -                        return 1;
 -                    return 0;
 -                }
 -            };
 +                boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
 +                boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
 +                if (local1 && !local2)
 +                    return -1;
 +                if (local2 && !local1)
 +                    return 1;
 +                return 0;
 +            }
 +        };
  
 -            /* Request Scheduler setup */
 -            requestSchedulerOptions = conf.request_scheduler_options;
 -            if (conf.request_scheduler != null)
 +        /* Request Scheduler setup */
 +        requestSchedulerOptions = conf.request_scheduler_options;
 +        if (conf.request_scheduler != null)
 +        {
 +            try
              {
 -                try
 +                if (requestSchedulerOptions == null)
                  {
 -                    if (requestSchedulerOptions == null)
 -                    {
 -                        requestSchedulerOptions = new RequestSchedulerOptions();
 -                    }
 -                    Class<?> cls = Class.forName(conf.request_scheduler);
 -                    requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
 +                    requestSchedulerOptions = new RequestSchedulerOptions();
                  }
 -                catch (ClassNotFoundException e)
 -                {
 -                    throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler);
 -                }
 -                catch (Exception e)
 -                {
 -                    throw new ConfigurationException("Unable to instantiate request scheduler", e);
 -                }
 -            }
 -            else
 -            {
 -                requestScheduler = new NoScheduler();
 +                Class<?> cls = Class.forName(conf.request_scheduler);
 +                requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
              }
 -
 -            if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
 +            catch (ClassNotFoundException e)
              {
 -                requestSchedulerId = conf.request_scheduler_id;
 +                throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler);
              }
 -            else
 +            catch (Exception e)
              {
 -                // Default to Keyspace
 -                requestSchedulerId = RequestSchedulerId.keyspace;
 +                throw new ConfigurationException("Unable to instantiate request scheduler", e);
              }
 +        }
 +        else
 +        {
 +            requestScheduler = new NoScheduler();
 +        }
  
 -            if (logger.isDebugEnabled() && conf.auto_bootstrap != null)
 -            {
 -                logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap);
 -            }
 +        if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
 +        {
 +            requestSchedulerId = conf.request_scheduler_id;
 +        }
 +        else
 +        {
 +            // Default to Keyspace
 +            requestSchedulerId = RequestSchedulerId.keyspace;
 +        }
  
 -            logger.info((conf.multithreaded_compaction ? "" : "Not ") + "using multi-threaded compaction");
 +        if (logger.isDebugEnabled() && conf.auto_bootstrap != null)
 +        {
 +            logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap);
 +        }
  
 -            if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0)
 -            {
 -                throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer");
 -            }
 +        logger.info((conf.multithreaded_compaction ? "" : "Not ") + "using multi-threaded compaction");
  
 -            if (conf.concurrent_compactors == null)
 -                conf.concurrent_compactors = FBUtilities.getAvailableProcessors();
 +        if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0)
 +        {
 +            throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer");
 +        }
  
 -            if (conf.concurrent_compactors <= 0)
 -                throw new ConfigurationException("concurrent_compactors should be strictly greater than 0");
 +        if (conf.concurrent_compactors == null)
 +            conf.concurrent_compactors = FBUtilities.getAvailableProcessors();
  
 -            /* data file and commit log directories. they get created later, when they're needed. */
 -            if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
 -            {
 -                for (String datadir : conf.data_file_directories)
 -                {
 -                    if (datadir.equals(conf.commitlog_directory))
 -                        throw new ConfigurationException("commitlog_directory must not be the same as any data_file_directories");
 -                    if (datadir.equals(conf.saved_caches_directory))
 -                        throw new ConfigurationException("saved_caches_directory must not be the same as any data_file_directories");
 -                }
 +        if (conf.concurrent_compactors <= 0)
 +            throw new ConfigurationException("concurrent_compactors should be strictly greater than 0");
  
 -                if (conf.commitlog_directory.equals(conf.saved_caches_directory))
 -                    throw new ConfigurationException("saved_caches_directory must not be the same as the commitlog_directory");
 -            }
 -            else
 +        /* data file and commit log directories. they get created later, when they're needed. */
 +        if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
 +        {
 +            for (String datadir : conf.data_file_directories)
              {
 -                if (conf.commitlog_directory == null)
 -                    throw new ConfigurationException("commitlog_directory missing");
 -                if (conf.data_file_directories == null)
 -                    throw new ConfigurationException("data_file_directories missing; at least one data directory must be specified");
 -                if (conf.saved_caches_directory == null)
 -                    throw new ConfigurationException("saved_caches_directory missing");
 +                if (datadir.equals(conf.commitlog_directory))
 +                    throw new ConfigurationException("commitlog_directory must not be the same as any data_file_directories");
 +                if (datadir.equals(conf.saved_caches_directory))
 +                    throw new ConfigurationException("saved_caches_directory must not be the same as any data_file_directories");
              }
  
 -            if (conf.initial_token != null)
 -                for (String token : tokensFromString(conf.initial_token))
 -                    partitioner.getTokenFactory().validate(token);
 +            if (conf.commitlog_directory.equals(conf.saved_caches_directory))
 +                throw new ConfigurationException("saved_caches_directory must not be the same as the commitlog_directory");
 +        }
 +        else
 +        {
 +            if (conf.commitlog_directory == null)
 +                throw new ConfigurationException("commitlog_directory missing");
 +            if (conf.data_file_directories == null)
 +                throw new ConfigurationException("data_file_directories missing; at least one data directory must be specified");
 +            if (conf.saved_caches_directory == null)
 +                throw new ConfigurationException("saved_caches_directory missing");
 +        }
  
 -            try
 -            {
 -                // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB)
 -                keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
 -                                    ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
 -                                    : conf.key_cache_size_in_mb;
 +        if (conf.initial_token != null)
 +            for (String token : tokensFromString(conf.initial_token))
 +                partitioner.getTokenFactory().validate(token);
  
 -                if (keyCacheSizeInMB < 0)
 -                    throw new NumberFormatException(); // to escape duplicating error message
 -            }
 -            catch (NumberFormatException e)
 -            {
 -                throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
 -                                                 + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.");
 -            }
 +        try
 +        {
 +            // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB)
 +            keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
 +                ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
 +                : conf.key_cache_size_in_mb;
 +
 +            if (keyCacheSizeInMB < 0)
 +                throw new NumberFormatException(); // to escape duplicating error message
 +        }
 +        catch (NumberFormatException e)
 +        {
 +            throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
 +                    + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.");
 +        }
  
 -            rowCacheProvider = FBUtilities.newCacheProvider(conf.row_cache_provider);
 +        memoryAllocator = FBUtilities.newOffHeapAllocator(conf.memory_allocator);
  
 -            if(conf.encryption_options != null)
 -            {
 -                logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
 -                //operate under the assumption that server_encryption_options is not set in yaml rather than both
 -                conf.server_encryption_options = conf.encryption_options;
 -            }
 +        if(conf.encryption_options != null)
 +        {
 +            logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
 +            //operate under the assumption that server_encryption_options is not set in yaml rather than both
 +            conf.server_encryption_options = conf.encryption_options;
 +        }
  
 -            String allocatorClass = conf.memtable_allocator;
 -            if (!allocatorClass.contains("."))
 -                allocatorClass = "org.apache.cassandra.utils." + allocatorClass;
 -            memtableAllocator = FBUtilities.classForName(allocatorClass, "allocator");
 +        String allocatorClass = conf.memtable_allocator;
 +        if (!allocatorClass.contains("."))
 +            allocatorClass = "org.apache.cassandra.utils." + allocatorClass;
 +        memtableAllocator = FBUtilities.classForName(allocatorClass, "allocator");
  
 -            // Hardcoded system tables
 -            List<KSMetaData> systemKeyspaces = Arrays.asList(KSMetaData.systemKeyspace(), KSMetaData.traceKeyspace());
 -            assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size();
 -            for (KSMetaData ksmd : systemKeyspaces)
 -            {
 -                // install the definition
 -                for (CFMetaData cfm : ksmd.cfMetaData().values())
 -                    Schema.instance.load(cfm);
 -                Schema.instance.setTableDefinition(ksmd);
 -            }
 +        // Hardcoded system keyspaces
 +        List<KSMetaData> systemKeyspaces = Arrays.asList(KSMetaData.systemKeyspace(), KSMetaData.traceKeyspace());
 +        assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size();
 +        for (KSMetaData ksmd : systemKeyspaces)
 +        {
 +            // install the definition
 +            for (CFMetaData cfm : ksmd.cfMetaData().values())
 +                Schema.instance.load(cfm);
 +            Schema.instance.setKeyspaceDefinition(ksmd);
 +        }
  
 -            /* Load the seeds for node contact points */
 -            if (conf.seed_provider == null)
 -            {
 -                throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.");
 -            }
 -            try
 -            {
 -                Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name);
 -                seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
 -            }
 -            // there are about 5 checked exceptions that could be thrown here.
 -            catch (Exception e)
 -            {
 -                logger.error("Fatal configuration error", e);
 -                System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
 -                System.exit(1);
 -            }
 -            if (seedProvider.getSeeds().size() == 0)
 -                throw new ConfigurationException("The seed provider lists no seeds.");
 +        /* Load the seeds for node contact points */
 +        if (conf.seed_provider == null)
 +        {
 +            throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.");
          }
 -        catch (ConfigurationException e)
 +        try
          {
 -            logger.error("Fatal configuration error", e);
 -            System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
 -            System.exit(1);
 +            Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name);
 +            seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
          }
 -        catch (YAMLException e)
 +        // there are about 5 checked exceptions that could be thrown here.
 +        catch (Exception e)
          {
 -            logger.error("Fatal configuration error error", e);
 -            System.err.println(e.getMessage() + "\nInvalid yaml; unable to start server.  See log for stacktrace.");
 +            logger.error("Fatal configuration error", e);
 +            System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
              System.exit(1);
          }
 +        if (seedProvider.getSeeds().size() == 0)
 +            throw new ConfigurationException("The seed provider lists no seeds.");
      }
  
      private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e0d9513/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------


[5/6] git commit: merge from 1.2

Posted by jb...@apache.org.
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e0d9513
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e0d9513
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e0d9513

Branch: refs/heads/cassandra-2.0
Commit: 1e0d9513b748fae4ec0737283da71c65e9272102
Parents: cfa097c dfe4937
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 16:57:52 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 16:57:52 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   4 +
 .../org/apache/cassandra/config/Config.java     |   6 +-
 .../cassandra/config/DatabaseDescriptor.java    |   8 ++
 .../cassandra/io/util/PoolingSegmentedFile.java |  14 +-
 .../cassandra/io/util/RandomAccessReader.java   |   5 +
 .../cassandra/metrics/FileCacheMetrics.java     |  64 +++++++++
 .../cassandra/service/FileCacheService.java     | 139 +++++++++++++++++++
 8 files changed, 231 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e0d9513/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2192fe9,e4066ab..c2a55df
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,25 -1,5 +1,26 @@@
 -1.2.10
 +2.0.1
 + * Improve error message when yaml contains invalid properties (CASSANDRA-5958)
 + * Improve leveled compaction's ability to find non-overlapping L0 compactions
 +   to work on concurrently (CASSANDRA-5921)
 + * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
 + * Log Merkle tree stats (CASSANDRA-2698)
 + * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
 + * Improve offheap memcpy performance (CASSANDRA-5884)
 + * Use a range aware scanner for cleanup (CASSANDRA-2524)
 + * Cleanup doesn't need to inspect sstables that contain only local data 
 +   (CASSANDRA-5722)
 + * Add ability for CQL3 to list partition keys (CASSANDRA-4536)
 + * Improve native protocol serialization (CASSANDRA-5664)
 + * Upgrade Thrift to 0.9.1 (CASSANDRA-5923)
 + * Require superuser status for adding triggers (CASSANDRA-5963)
 + * Make standalone scrubber handle old and new style leveled manifest
 +   (CASSANDRA-6005)
 + * Fix paxos bugs (CASSANDRA-6012, 6013, 6023)
 + * Fix paged ranges with multiple replicas (CASSANDRA-6004)
 + * Fix potential AssertionError during tracing (CASSANDRA-6041)
 + * Fix NPE in sstablesplit (CASSANDRA-6027)
 +Merged from 1.2:
+  * add file_cache_size_in_mb setting (CASSANDRA-5661)
   * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
   * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
   * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e0d9513/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e0d9513/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index dd0728c,c5a4aa1..0414269
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -164,17 -166,16 +164,19 @@@ public class Confi
      public long row_cache_size_in_mb = 0;
      public volatile int row_cache_save_period = 0;
      public int row_cache_keys_to_save = Integer.MAX_VALUE;
 -    public String row_cache_provider = SerializingCacheProvider.class.getSimpleName();
 +    public String memory_allocator = NativeAllocator.class.getSimpleName();
      public boolean populate_io_cache_on_flush = false;
  
-     public boolean inter_dc_tcp_nodelay = false;
- 
 +    private static boolean isClientMode = false;
 +
 +    public boolean preheat_kernel_page_cache = false;
 +
+     public Integer file_cache_size_in_mb;
+ 
+     public boolean inter_dc_tcp_nodelay = true;
+ 
      public String memtable_allocator = "SlabAllocator";
  
 -    private static boolean loadYaml = true;
      private static boolean outboundBindAny = false;
  
      public static boolean getOutboundBindAny()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e0d9513/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c2f3fa6,dbf0905..b3eeca6
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -112,372 -111,416 +112,375 @@@ public class DatabaseDescripto
          }
          catch (Exception e)
          {
 -            ClassLoader loader = DatabaseDescriptor.class.getClassLoader();
 -            url = loader.getResource(configUrl);
 -            if (url == null)
 -                throw new ConfigurationException("Cannot locate " + configUrl);
 +            logger.error("Fatal error during configuration loading", e);
 +            System.err.println(e.getMessage() + "\nFatal error during configuration loading; unable to start. See log for stacktrace.");
 +            System.exit(1);
          }
 -
 -        return url;
      }
  
 -    static
 +    @VisibleForTesting
 +    static Config loadConfig() throws ConfigurationException
      {
 -        if (Config.getLoadYaml())
 -            loadYaml();
 -        else
 -            conf = new Config();
 +        String loaderClass = System.getProperty("cassandra.config.loader");
 +        ConfigurationLoader loader = loaderClass == null
 +                                   ? new YamlConfigurationLoader()
 +                                   : FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading");
 +        return loader.loadConfig();
      }
 -    static void loadYaml()
 +
 +    private static void applyConfig(Config config) throws ConfigurationException
      {
 -        try
 -        {
 -            URL url = getStorageConfigURL();
 -            logger.info("Loading settings from " + url);
 -            InputStream input;
 -            try
 -            {
 -                input = url.openStream();
 -            }
 -            catch (IOException e)
 -            {
 -                // getStorageConfigURL should have ruled this out
 -                throw new AssertionError(e);
 -            }
 -            org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class);
 -            TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class);
 -            seedDesc.putMapPropertyType("parameters", String.class, String.class);
 -            constructor.addTypeDescription(seedDesc);
 -            Yaml yaml = new Yaml(new Loader(constructor));
 -            conf = (Config)yaml.load(input);
 +        conf = config;
  
 -            logger.info("Data files directories: " + Arrays.toString(conf.data_file_directories));
 -            logger.info("Commit log directory: " + conf.commitlog_directory);
 +        logger.info("Data files directories: " + Arrays.toString(conf.data_file_directories));
 +        logger.info("Commit log directory: " + conf.commitlog_directory);
  
 -            if (conf.commitlog_sync == null)
 -            {
 -                throw new ConfigurationException("Missing required directive CommitLogSync");
 -            }
 +        if (conf.commitlog_sync == null)
 +        {
 +            throw new ConfigurationException("Missing required directive CommitLogSync");
 +        }
  
 -            if (conf.commitlog_sync == Config.CommitLogSync.batch)
 -            {
 -                if (conf.commitlog_sync_batch_window_in_ms == null)
 -                {
 -                    throw new ConfigurationException("Missing value for commitlog_sync_batch_window_in_ms: Double expected.");
 -                }
 -                else if (conf.commitlog_sync_period_in_ms != null)
 -                {
 -                    throw new ConfigurationException("Batch sync specified, but commitlog_sync_period_in_ms found. Only specify commitlog_sync_batch_window_in_ms when using batch sync");
 -                }
 -                logger.debug("Syncing log with a batch window of " + conf.commitlog_sync_batch_window_in_ms);
 -            }
 -            else
 +        if (conf.commitlog_sync == Config.CommitLogSync.batch)
 +        {
 +            if (conf.commitlog_sync_batch_window_in_ms == null)
              {
 -                if (conf.commitlog_sync_period_in_ms == null)
 -                {
 -                    throw new ConfigurationException("Missing value for commitlog_sync_period_in_ms: Integer expected");
 -                }
 -                else if (conf.commitlog_sync_batch_window_in_ms != null)
 -                {
 -                    throw new ConfigurationException("commitlog_sync_period_in_ms specified, but commitlog_sync_batch_window_in_ms found.  Only specify commitlog_sync_period_in_ms when using periodic sync.");
 -                }
 -                logger.debug("Syncing log with a period of " + conf.commitlog_sync_period_in_ms);
 +                throw new ConfigurationException("Missing value for commitlog_sync_batch_window_in_ms: Double expected.");
              }
 -
 -            if (conf.commitlog_total_space_in_mb == null)
 -                conf.commitlog_total_space_in_mb = System.getProperty("os.arch").contains("64") ? 1024 : 32;
 -
 -            /* evaluate the DiskAccessMode Config directive, which also affects indexAccessMode selection */
 -            if (conf.disk_access_mode == Config.DiskAccessMode.auto)
 +            else if (conf.commitlog_sync_period_in_ms != null)
              {
 -                conf.disk_access_mode = System.getProperty("os.arch").contains("64") ? Config.DiskAccessMode.mmap : Config.DiskAccessMode.standard;
 -                indexAccessMode = conf.disk_access_mode;
 -                logger.info("DiskAccessMode 'auto' determined to be " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +                throw new ConfigurationException("Batch sync specified, but commitlog_sync_period_in_ms found. Only specify commitlog_sync_batch_window_in_ms when using batch sync");
              }
 -            else if (conf.disk_access_mode == Config.DiskAccessMode.mmap_index_only)
 +            logger.debug("Syncing log with a batch window of " + conf.commitlog_sync_batch_window_in_ms);
 +        }
 +        else
 +        {
 +            if (conf.commitlog_sync_period_in_ms == null)
              {
 -                conf.disk_access_mode = Config.DiskAccessMode.standard;
 -                indexAccessMode = Config.DiskAccessMode.mmap;
 -                logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +                throw new ConfigurationException("Missing value for commitlog_sync_period_in_ms: Integer expected");
              }
 -            else
 +            else if (conf.commitlog_sync_batch_window_in_ms != null)
              {
 -                indexAccessMode = conf.disk_access_mode;
 -                logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +                throw new ConfigurationException("commitlog_sync_period_in_ms specified, but commitlog_sync_batch_window_in_ms found.  Only specify commitlog_sync_period_in_ms when using periodic sync.");
              }
 +            logger.debug("Syncing log with a period of " + conf.commitlog_sync_period_in_ms);
 +        }
  
 -            logger.info("disk_failure_policy is " + conf.disk_failure_policy);
 +        if (conf.commitlog_total_space_in_mb == null)
 +            conf.commitlog_total_space_in_mb = System.getProperty("os.arch").contains("64") ? 1024 : 32;
  
 -            /* Authentication and authorization backend, implementing IAuthenticator and IAuthorizer */
 -            if (conf.authenticator != null)
 -                authenticator = FBUtilities.newAuthenticator(conf.authenticator);
 +        /* evaluate the DiskAccessMode Config directive, which also affects indexAccessMode selection */
 +        if (conf.disk_access_mode == Config.DiskAccessMode.auto)
 +        {
 +            conf.disk_access_mode = System.getProperty("os.arch").contains("64") ? Config.DiskAccessMode.mmap : Config.DiskAccessMode.standard;
 +            indexAccessMode = conf.disk_access_mode;
 +            logger.info("DiskAccessMode 'auto' determined to be " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +        }
 +        else if (conf.disk_access_mode == Config.DiskAccessMode.mmap_index_only)
 +        {
 +            conf.disk_access_mode = Config.DiskAccessMode.standard;
 +            indexAccessMode = Config.DiskAccessMode.mmap;
 +            logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +        }
 +        else
 +        {
 +            indexAccessMode = conf.disk_access_mode;
 +            logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
 +        }
  
 -            if (conf.authority != null)
 -            {
 -                logger.warn("Please rename 'authority' to 'authorizer' in cassandra.yaml");
 -                if (!conf.authority.equals("org.apache.cassandra.auth.AllowAllAuthority"))
 -                    throw new ConfigurationException("IAuthority interface has been deprecated,"
 -                                                     + " please implement IAuthorizer instead.");
 -            }
 +        logger.info("disk_failure_policy is " + conf.disk_failure_policy);
  
 -            if (conf.authorizer != null)
 -                authorizer = FBUtilities.newAuthorizer(conf.authorizer);
 +        /* Authentication and authorization backend, implementing IAuthenticator and IAuthorizer */
 +        if (conf.authenticator != null)
 +            authenticator = FBUtilities.newAuthenticator(conf.authenticator);
  
 -            if (authenticator instanceof AllowAllAuthenticator && !(authorizer instanceof AllowAllAuthorizer))
 -                throw new ConfigurationException("AllowAllAuthenticator can't be used with " +  conf.authorizer);
 +        if (conf.authorizer != null)
 +            authorizer = FBUtilities.newAuthorizer(conf.authorizer);
  
 -            if (conf.internode_authenticator != null)
 -                internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator");
 -            else
 -                internodeAuthenticator = new AllowAllInternodeAuthenticator();
 +        if (authenticator instanceof AllowAllAuthenticator && !(authorizer instanceof AllowAllAuthorizer))
 +            throw new ConfigurationException("AllowAllAuthenticator can't be used with " +  conf.authorizer);
  
 -            authenticator.validateConfiguration();
 -            authorizer.validateConfiguration();
 -            internodeAuthenticator.validateConfiguration();
 +        if (conf.internode_authenticator != null)
 +            internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator");
 +        else
 +            internodeAuthenticator = new AllowAllInternodeAuthenticator();
  
 -            /* Hashing strategy */
 -            if (conf.partitioner == null)
 -            {
 -                throw new ConfigurationException("Missing directive: partitioner");
 -            }
 +        authenticator.validateConfiguration();
 +        authorizer.validateConfiguration();
 +        internodeAuthenticator.validateConfiguration();
  
 -            try
 -            {
 -                partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner));
 -            }
 -            catch (Exception e)
 -            {
 -                throw new ConfigurationException("Invalid partitioner class " + conf.partitioner);
 -            }
 -            paritionerName = partitioner.getClass().getCanonicalName();
 +        /* Hashing strategy */
 +        if (conf.partitioner == null)
 +        {
 +            throw new ConfigurationException("Missing directive: partitioner");
 +        }
 +        try
 +        {
 +            partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner));
 +        }
 +        catch (Exception e)
 +        {
 +            throw new ConfigurationException("Invalid partitioner class " + conf.partitioner);
 +        }
 +        paritionerName = partitioner.getClass().getCanonicalName();
  
 -            /* phi convict threshold for FailureDetector */
 -            if (conf.phi_convict_threshold < 5 || conf.phi_convict_threshold > 16)
 -            {
 -                throw new ConfigurationException("phi_convict_threshold must be between 5 and 16");
 -            }
 +        /* phi convict threshold for FailureDetector */
 +        if (conf.phi_convict_threshold < 5 || conf.phi_convict_threshold > 16)
 +        {
 +            throw new ConfigurationException("phi_convict_threshold must be between 5 and 16");
 +        }
  
 -            /* Thread per pool */
 -            if (conf.concurrent_reads != null && conf.concurrent_reads < 2)
 -            {
 -                throw new ConfigurationException("concurrent_reads must be at least 2");
 -            }
 +        /* Thread per pool */
 +        if (conf.concurrent_reads != null && conf.concurrent_reads < 2)
 +        {
 +            throw new ConfigurationException("concurrent_reads must be at least 2");
 +        }
  
 -            if (conf.concurrent_writes != null && conf.concurrent_writes < 2)
 -            {
 -                throw new ConfigurationException("concurrent_writes must be at least 2");
 -            }
 +        if (conf.concurrent_writes != null && conf.concurrent_writes < 2)
 +        {
 +            throw new ConfigurationException("concurrent_writes must be at least 2");
 +        }
  
 -            if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2)
 -            {
 -                throw new ConfigurationException("concurrent_replicates must be at least 2");
 -            }
 +        if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2)
 +        {
 +            throw new ConfigurationException("concurrent_replicates must be at least 2");
 +        }
 +
++        if (conf.file_cache_size_in_mb == null)
++            conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
+ 
 -            if (conf.file_cache_size_in_mb == null)
 -                conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
 +        if (conf.memtable_total_space_in_mb == null)
 +            conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
 +        if (conf.memtable_total_space_in_mb <= 0)
 +            throw new ConfigurationException("memtable_total_space_in_mb must be positive");
 +        logger.info("Global memtable threshold is enabled at {}MB", conf.memtable_total_space_in_mb);
  
 -            if (conf.memtable_total_space_in_mb == null)
 -                conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
 -            if (conf.memtable_total_space_in_mb <= 0)
 -                throw new ConfigurationException("memtable_total_space_in_mb must be positive");
 -            logger.info("Global memtable threshold is enabled at {}MB", conf.memtable_total_space_in_mb);
 +        /* Memtable flush writer threads */
 +        if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1)
 +        {
 +            throw new ConfigurationException("memtable_flush_writers must be at least 1");
 +        }
 +        else if (conf.memtable_flush_writers == null)
 +        {
 +            conf.memtable_flush_writers = conf.data_file_directories.length;
 +        }
  
 -            /* Memtable flush writer threads */
 -            if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1)
 +        /* Local IP or hostname to bind services to */
 +        if (conf.listen_address != null)
 +        {
 +            try
              {
 -                throw new ConfigurationException("memtable_flush_writers must be at least 1");
 +                listenAddress = InetAddress.getByName(conf.listen_address);
              }
 -            else if (conf.memtable_flush_writers == null)
 +            catch (UnknownHostException e)
              {
 -                conf.memtable_flush_writers = conf.data_file_directories.length;
 +                throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'");
              }
 +        }
  
 -            /* Local IP or hostname to bind services to */
 -            if (conf.listen_address != null)
 +        /* Gossip Address to broadcast */
 +        if (conf.broadcast_address != null)
 +        {
 +            if (conf.broadcast_address.equals("0.0.0.0"))
              {
 -                try
 -                {
 -                    listenAddress = InetAddress.getByName(conf.listen_address);
 -                }
 -                catch (UnknownHostException e)
 -                {
 -                    throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'");
 -                }
 +                throw new ConfigurationException("broadcast_address cannot be 0.0.0.0!");
              }
  
 -            /* Gossip Address to broadcast */
 -            if (conf.broadcast_address != null)
 +            try
              {
 -                if (conf.broadcast_address.equals("0.0.0.0"))
 -                {
 -                    throw new ConfigurationException("broadcast_address cannot be 0.0.0.0!");
 -                }
 -
 -                try
 -                {
 -                    broadcastAddress = InetAddress.getByName(conf.broadcast_address);
 -                }
 -                catch (UnknownHostException e)
 -                {
 -                    throw new ConfigurationException("Unknown broadcast_address '" + conf.broadcast_address + "'");
 -                }
 +                broadcastAddress = InetAddress.getByName(conf.broadcast_address);
              }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown broadcast_address '" + conf.broadcast_address + "'");
 +            }
 +        }
  
 -            /* Local IP or hostname to bind RPC server to */
 -            if (conf.rpc_address != null)
 +        /* Local IP or hostname to bind RPC server to */
 +        if (conf.rpc_address != null)
 +        {
 +            try
              {
 -                try
 -                {
 -                    rpcAddress = InetAddress.getByName(conf.rpc_address);
 -                }
 -                catch (UnknownHostException e)
 -                {
 -                    throw new ConfigurationException("Unknown host in rpc_address " + conf.rpc_address);
 -                }
 +                rpcAddress = InetAddress.getByName(conf.rpc_address);
              }
 -            else
 +            catch (UnknownHostException e)
              {
 -                rpcAddress = FBUtilities.getLocalAddress();
 +                throw new ConfigurationException("Unknown host in rpc_address " + conf.rpc_address);
              }
 +        }
 +        else
 +        {
 +            rpcAddress = FBUtilities.getLocalAddress();
 +        }
  
 -            if (conf.thrift_framed_transport_size_in_mb <= 0)
 -                throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive");
 +        if (conf.thrift_framed_transport_size_in_mb <= 0)
 +            throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive");
  
 -            /* end point snitch */
 -            if (conf.endpoint_snitch == null)
 -            {
 -                throw new ConfigurationException("Missing endpoint_snitch directive");
 -            }
 -            snitch = createEndpointSnitch(conf.endpoint_snitch);
 -            EndpointSnitchInfo.create();
 +        /* end point snitch */
 +        if (conf.endpoint_snitch == null)
 +        {
 +            throw new ConfigurationException("Missing endpoint_snitch directive");
 +        }
 +        snitch = createEndpointSnitch(conf.endpoint_snitch);
 +        EndpointSnitchInfo.create();
  
 -            localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 -            localComparator = new Comparator<InetAddress>()
 +        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 +        localComparator = new Comparator<InetAddress>()
 +        {
 +            public int compare(InetAddress endpoint1, InetAddress endpoint2)
              {
 -                public int compare(InetAddress endpoint1, InetAddress endpoint2)
 -                {
 -                    boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
 -                    boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
 -                    if (local1 && !local2)
 -                        return -1;
 -                    if (local2 && !local1)
 -                        return 1;
 -                    return 0;
 -                }
 -            };
 +                boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
 +                boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
 +                if (local1 && !local2)
 +                    return -1;
 +                if (local2 && !local1)
 +                    return 1;
 +                return 0;
 +            }
 +        };
  
 -            /* Request Scheduler setup */
 -            requestSchedulerOptions = conf.request_scheduler_options;
 -            if (conf.request_scheduler != null)
 +        /* Request Scheduler setup */
 +        requestSchedulerOptions = conf.request_scheduler_options;
 +        if (conf.request_scheduler != null)
 +        {
 +            try
              {
 -                try
 +                if (requestSchedulerOptions == null)
                  {
 -                    if (requestSchedulerOptions == null)
 -                    {
 -                        requestSchedulerOptions = new RequestSchedulerOptions();
 -                    }
 -                    Class<?> cls = Class.forName(conf.request_scheduler);
 -                    requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
 +                    requestSchedulerOptions = new RequestSchedulerOptions();
                  }
 -                catch (ClassNotFoundException e)
 -                {
 -                    throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler);
 -                }
 -                catch (Exception e)
 -                {
 -                    throw new ConfigurationException("Unable to instantiate request scheduler", e);
 -                }
 -            }
 -            else
 -            {
 -                requestScheduler = new NoScheduler();
 +                Class<?> cls = Class.forName(conf.request_scheduler);
 +                requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
              }
 -
 -            if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
 +            catch (ClassNotFoundException e)
              {
 -                requestSchedulerId = conf.request_scheduler_id;
 +                throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler);
              }
 -            else
 +            catch (Exception e)
              {
 -                // Default to Keyspace
 -                requestSchedulerId = RequestSchedulerId.keyspace;
 +                throw new ConfigurationException("Unable to instantiate request scheduler", e);
              }
 +        }
 +        else
 +        {
 +            requestScheduler = new NoScheduler();
 +        }
  
 -            if (logger.isDebugEnabled() && conf.auto_bootstrap != null)
 -            {
 -                logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap);
 -            }
 +        if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
 +        {
 +            requestSchedulerId = conf.request_scheduler_id;
 +        }
 +        else
 +        {
 +            // Default to Keyspace
 +            requestSchedulerId = RequestSchedulerId.keyspace;
 +        }
  
 -            logger.info((conf.multithreaded_compaction ? "" : "Not ") + "using multi-threaded compaction");
 +        if (logger.isDebugEnabled() && conf.auto_bootstrap != null)
 +        {
 +            logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap);
 +        }
  
 -            if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0)
 -            {
 -                throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer");
 -            }
 +        logger.info((conf.multithreaded_compaction ? "" : "Not ") + "using multi-threaded compaction");
  
 -            if (conf.concurrent_compactors == null)
 -                conf.concurrent_compactors = FBUtilities.getAvailableProcessors();
 +        if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0)
 +        {
 +            throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer");
 +        }
  
 -            if (conf.concurrent_compactors <= 0)
 -                throw new ConfigurationException("concurrent_compactors should be strictly greater than 0");
 +        if (conf.concurrent_compactors == null)
 +            conf.concurrent_compactors = FBUtilities.getAvailableProcessors();
  
 -            /* data file and commit log directories. they get created later, when they're needed. */
 -            if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
 -            {
 -                for (String datadir : conf.data_file_directories)
 -                {
 -                    if (datadir.equals(conf.commitlog_directory))
 -                        throw new ConfigurationException("commitlog_directory must not be the same as any data_file_directories");
 -                    if (datadir.equals(conf.saved_caches_directory))
 -                        throw new ConfigurationException("saved_caches_directory must not be the same as any data_file_directories");
 -                }
 +        if (conf.concurrent_compactors <= 0)
 +            throw new ConfigurationException("concurrent_compactors should be strictly greater than 0");
  
 -                if (conf.commitlog_directory.equals(conf.saved_caches_directory))
 -                    throw new ConfigurationException("saved_caches_directory must not be the same as the commitlog_directory");
 -            }
 -            else
 +        /* data file and commit log directories. they get created later, when they're needed. */
 +        if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null)
 +        {
 +            for (String datadir : conf.data_file_directories)
              {
 -                if (conf.commitlog_directory == null)
 -                    throw new ConfigurationException("commitlog_directory missing");
 -                if (conf.data_file_directories == null)
 -                    throw new ConfigurationException("data_file_directories missing; at least one data directory must be specified");
 -                if (conf.saved_caches_directory == null)
 -                    throw new ConfigurationException("saved_caches_directory missing");
 +                if (datadir.equals(conf.commitlog_directory))
 +                    throw new ConfigurationException("commitlog_directory must not be the same as any data_file_directories");
 +                if (datadir.equals(conf.saved_caches_directory))
 +                    throw new ConfigurationException("saved_caches_directory must not be the same as any data_file_directories");
              }
  
 -            if (conf.initial_token != null)
 -                for (String token : tokensFromString(conf.initial_token))
 -                    partitioner.getTokenFactory().validate(token);
 +            if (conf.commitlog_directory.equals(conf.saved_caches_directory))
 +                throw new ConfigurationException("saved_caches_directory must not be the same as the commitlog_directory");
 +        }
 +        else
 +        {
 +            if (conf.commitlog_directory == null)
 +                throw new ConfigurationException("commitlog_directory missing");
 +            if (conf.data_file_directories == null)
 +                throw new ConfigurationException("data_file_directories missing; at least one data directory must be specified");
 +            if (conf.saved_caches_directory == null)
 +                throw new ConfigurationException("saved_caches_directory missing");
 +        }
  
 -            try
 -            {
 -                // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB)
 -                keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
 -                                    ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
 -                                    : conf.key_cache_size_in_mb;
 +        if (conf.initial_token != null)
 +            for (String token : tokensFromString(conf.initial_token))
 +                partitioner.getTokenFactory().validate(token);
  
 -                if (keyCacheSizeInMB < 0)
 -                    throw new NumberFormatException(); // to escape duplicating error message
 -            }
 -            catch (NumberFormatException e)
 -            {
 -                throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
 -                                                 + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.");
 -            }
 +        try
 +        {
 +            // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB)
 +            keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
 +                ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
 +                : conf.key_cache_size_in_mb;
 +
 +            if (keyCacheSizeInMB < 0)
 +                throw new NumberFormatException(); // to escape duplicating error message
 +        }
 +        catch (NumberFormatException e)
 +        {
 +            throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '"
 +                    + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.");
 +        }
  
 -            rowCacheProvider = FBUtilities.newCacheProvider(conf.row_cache_provider);
 +        memoryAllocator = FBUtilities.newOffHeapAllocator(conf.memory_allocator);
  
 -            if(conf.encryption_options != null)
 -            {
 -                logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
 -                //operate under the assumption that server_encryption_options is not set in yaml rather than both
 -                conf.server_encryption_options = conf.encryption_options;
 -            }
 +        if(conf.encryption_options != null)
 +        {
 +            logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
 +            //operate under the assumption that server_encryption_options is not set in yaml rather than both
 +            conf.server_encryption_options = conf.encryption_options;
 +        }
  
 -            String allocatorClass = conf.memtable_allocator;
 -            if (!allocatorClass.contains("."))
 -                allocatorClass = "org.apache.cassandra.utils." + allocatorClass;
 -            memtableAllocator = FBUtilities.classForName(allocatorClass, "allocator");
 +        String allocatorClass = conf.memtable_allocator;
 +        if (!allocatorClass.contains("."))
 +            allocatorClass = "org.apache.cassandra.utils." + allocatorClass;
 +        memtableAllocator = FBUtilities.classForName(allocatorClass, "allocator");
  
 -            // Hardcoded system tables
 -            List<KSMetaData> systemKeyspaces = Arrays.asList(KSMetaData.systemKeyspace(), KSMetaData.traceKeyspace());
 -            assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size();
 -            for (KSMetaData ksmd : systemKeyspaces)
 -            {
 -                // install the definition
 -                for (CFMetaData cfm : ksmd.cfMetaData().values())
 -                    Schema.instance.load(cfm);
 -                Schema.instance.setTableDefinition(ksmd);
 -            }
 +        // Hardcoded system keyspaces
 +        List<KSMetaData> systemKeyspaces = Arrays.asList(KSMetaData.systemKeyspace(), KSMetaData.traceKeyspace());
 +        assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size();
 +        for (KSMetaData ksmd : systemKeyspaces)
 +        {
 +            // install the definition
 +            for (CFMetaData cfm : ksmd.cfMetaData().values())
 +                Schema.instance.load(cfm);
 +            Schema.instance.setKeyspaceDefinition(ksmd);
 +        }
  
 -            /* Load the seeds for node contact points */
 -            if (conf.seed_provider == null)
 -            {
 -                throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.");
 -            }
 -            try
 -            {
 -                Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name);
 -                seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
 -            }
 -            // there are about 5 checked exceptions that could be thrown here.
 -            catch (Exception e)
 -            {
 -                logger.error("Fatal configuration error", e);
 -                System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
 -                System.exit(1);
 -            }
 -            if (seedProvider.getSeeds().size() == 0)
 -                throw new ConfigurationException("The seed provider lists no seeds.");
 +        /* Load the seeds for node contact points */
 +        if (conf.seed_provider == null)
 +        {
 +            throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.");
          }
 -        catch (ConfigurationException e)
 +        try
          {
 -            logger.error("Fatal configuration error", e);
 -            System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
 -            System.exit(1);
 +            Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name);
 +            seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
          }
 -        catch (YAMLException e)
 +        // there are about 5 checked exceptions that could be thrown here.
 +        catch (Exception e)
          {
 -            logger.error("Fatal configuration error error", e);
 -            System.err.println(e.getMessage() + "\nInvalid yaml; unable to start server.  See log for stacktrace.");
 +            logger.error("Fatal configuration error", e);
 +            System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server.  See log for stacktrace.");
              System.exit(1);
          }
 +        if (seedProvider.getSeeds().size() == 0)
 +            throw new ConfigurationException("The seed provider lists no seeds.");
      }
  
      private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e0d9513/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------


[3/6] git commit: add file_cache_size_in_mb setting patch by pyaskevich and jbellis for CASSANDRA-5661

Posted by jb...@apache.org.
add file_cache_size_in_mb setting
patch by pyaskevich and jbellis for CASSANDRA-5661


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dfe49376
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dfe49376
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dfe49376

Branch: refs/heads/trunk
Commit: dfe49376097cf73c04c0d8506782263a2e820cb0
Parents: 6e2401e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 16:49:11 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 16:53:11 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   4 +
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 ++
 .../cassandra/io/util/PoolingSegmentedFile.java |  14 +-
 .../cassandra/io/util/RandomAccessReader.java   |   5 +
 .../cassandra/metrics/FileCacheMetrics.java     |  64 +++++++++
 .../cassandra/service/FileCacheService.java     | 139 +++++++++++++++++++
 8 files changed, 229 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb4f3f4..e4066ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.10
+ * add file_cache_size_in_mb setting (CASSANDRA-5661)
  * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
  * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
  * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 27ac09b..2916ed9 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -275,6 +275,10 @@ reduce_cache_capacity_to: 0.6
 concurrent_reads: 32
 concurrent_writes: 32
 
+# Total memory to use for sstable-reading buffers.  Defaults to
+# the smaller of 1/4 of heap or 512MB.
+# file_cache_size_in_mb: 512
+
 # Total memory to use for memtables.  Cassandra will flush the largest
 # memtable when this much memory is used.
 # If omitted, Cassandra will set it to 1/3 of the heap.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a924a4c..c5a4aa1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -169,6 +169,8 @@ public class Config
     public String row_cache_provider = SerializingCacheProvider.class.getSimpleName();
     public boolean populate_io_cache_on_flush = false;
 
+    public Integer file_cache_size_in_mb;
+
     public boolean inter_dc_tcp_nodelay = true;
 
     public String memtable_allocator = "SlabAllocator";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8e3cbe2..dbf0905 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -272,6 +272,9 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("concurrent_replicates must be at least 2");
             }
 
+            if (conf.file_cache_size_in_mb == null)
+                conf.file_cache_size_in_mb = Math.min(512, (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576)));
+
             if (conf.memtable_total_space_in_mb == null)
                 conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
             if (conf.memtable_total_space_in_mb <= 0)
@@ -1209,6 +1212,11 @@ public class DatabaseDescriptor
         return conf.memtable_flush_queue_size;
     }
 
+    public static int getFileCacheSizeInMB()
+    {
+        return conf.file_cache_size_in_mb;
+    }
+
     public static int getTotalMemtableSpaceInMB()
     {
         // should only be called if estimatesRealMemtableSize() is true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index 4173d5a..892611c 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -17,13 +17,10 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.cassandra.service.FileCacheService;
 
 public abstract class PoolingSegmentedFile extends SegmentedFile
 {
-    public final Queue<RandomAccessReader> pool = new ConcurrentLinkedQueue<RandomAccessReader>();
-
     protected PoolingSegmentedFile(String path, long length)
     {
         super(path, length);
@@ -36,9 +33,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public FileDataInput getSegment(long position)
     {
-        RandomAccessReader reader = pool.poll();
+        RandomAccessReader reader = FileCacheService.instance.get(path);
+
         if (reader == null)
             reader = createReader(path);
+
         reader.seek(position);
         return reader;
     }
@@ -47,12 +46,11 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     public void recycle(RandomAccessReader reader)
     {
-        pool.add(reader);
+        FileCacheService.instance.put(reader);
     }
 
     public void cleanup()
     {
-        for (RandomAccessReader reader : pool)
-            reader.deallocate();
+        FileCacheService.instance.invalidate(path);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 3210372..64c5cf7 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -186,6 +186,11 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         return filePath;
     }
 
+    public int getBufferSize()
+    {
+        return buffer.length;
+    }
+
     public void reset()
     {
         seek(markedPointer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java b/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
new file mode 100644
index 0000000..9b21de6
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/FileCacheMetrics.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.util.RatioGauge;
+import org.apache.cassandra.service.FileCacheService;
+
+public class FileCacheMetrics
+{
+    /** Total number of hits */
+    public final Meter hits;
+    /** Total number of requests */
+    public final Meter requests;
+    /** hit rate */
+    public final Gauge<Double> hitRate;
+    /** Total size of file cache, in bytes */
+    public final Gauge<Long> size;
+
+    public FileCacheMetrics()
+    {
+        hits = Metrics.newMeter(new MetricName(FileCacheService.class, "Hits"), "hits", TimeUnit.SECONDS);
+        requests = Metrics.newMeter(new MetricName(FileCacheService.class, "Requests"), "requests", TimeUnit.SECONDS);
+        hitRate = Metrics.newGauge(new MetricName(FileCacheService.class, "HitRate"), new RatioGauge()
+        {
+            protected double getNumerator()
+            {
+                return hits.count();
+            }
+
+            protected double getDenominator()
+            {
+                return requests.count();
+            }
+        });
+        size = Metrics.newGauge(new MetricName(FileCacheService.class, "Size"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return FileCacheService.instance.sizeInBytes();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfe49376/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java
new file mode 100644
index 0000000..9dd1b15
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.metrics.FileCacheMetrics;
+
+public class FileCacheService
+{
+    private static final Logger logger = LoggerFactory.getLogger(FileCacheService.class);
+
+    private static final long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024 * 1024;
+    private static final int AFTER_ACCESS_EXPIRATION = 512; // in millis
+
+    public static FileCacheService instance = new FileCacheService();
+
+    private final Cache<String, Queue<RandomAccessReader>> cache;
+    private final FileCacheMetrics metrics = new FileCacheMetrics();
+    public final Callable<Queue<RandomAccessReader>> cacheForPathCreator = new Callable<Queue<RandomAccessReader>>()
+    {
+        @Override
+        public Queue<RandomAccessReader> call() throws Exception
+        {
+            return new ConcurrentLinkedQueue<RandomAccessReader>();
+        }
+    };
+
+    protected FileCacheService()
+    {
+        cache = CacheBuilder.<String, Queue<RandomAccessReader>>newBuilder()
+                            .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS)
+                            .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
+                            .removalListener(new RemovalListener<String, Queue<RandomAccessReader>>()
+                            {
+                                @Override
+                                public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>> notification)
+                                {
+                                    Queue<RandomAccessReader> cachedInstances = notification.getValue();
+
+                                    if (cachedInstances == null)
+                                        return;
+
+                                    for (RandomAccessReader reader : cachedInstances)
+                                        reader.deallocate();
+                                }
+                            })
+                            .build();
+    }
+
+    public RandomAccessReader get(String path)
+    {
+        metrics.requests.mark();
+
+        Queue<RandomAccessReader> instances = getCacheFor(path);
+
+        if (instances == null)
+            return null;
+
+        RandomAccessReader result = instances.poll();
+
+        if (result != null)
+            metrics.hits.mark();
+
+        return result;
+    }
+
+    private Queue<RandomAccessReader> getCacheFor(String path)
+    {
+        try
+        {
+            return cache.get(path, cacheForPathCreator);
+        }
+        catch (ExecutionException e)
+        {
+            // if something bad happened, let's just carry on and return null
+            // as dysfunctional queue should not interrupt normal operation
+            logger.debug("Exception fetching cache", e);
+        }
+
+        return null;
+    }
+
+    public void put(RandomAccessReader instance)
+    {
+        // This wouldn't be precise sometimes when CRAR is used because
+        // there is a way for users to dynamically change the size of the buffer,
+        // but we don't expect that to happen frequently in production.
+        // Doing accounting this way also allows us to avoid atomic CAS operation on read path.
+        long memoryUsage = (cache.size() + 1) * instance.getBufferSize();
+
+        if (memoryUsage >= MEMORY_USAGE_THRESHOLD)
+            instance.deallocate();
+        else
+            getCacheFor(instance.getPath()).add(instance);
+    }
+
+    public void invalidate(String path)
+    {
+        cache.invalidate(path);
+    }
+
+    public long sizeInBytes()
+    {
+        long n = 0;
+        for (Queue<RandomAccessReader> queue : cache.asMap().values())
+            for (RandomAccessReader reader : queue)
+                n += reader.getBufferSize();
+        return n;
+    }
+}