You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/11/28 01:17:03 UTC

[1/5] cassandra git commit: Expose off-heap memory usage stats

Repository: cassandra
Updated Branches:
  refs/heads/trunk 794d68b51 -> 371ad9eea


Expose off-heap memory usage stats

patch by Benjamin Lerer; reviewed by Aleksey Yeschenko for
CASSANDRA-7897


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7a14a77f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7a14a77f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7a14a77f

Branch: refs/heads/trunk
Commit: 7a14a77f2cbe2ce10ac35d5853ddf9496a86e16e
Parents: 8b5cf64
Author: blerer <b_...@hotmail.com>
Authored: Fri Nov 28 02:55:46 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 28 02:55:46 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 15 +++++
 .../cassandra/db/ColumnFamilyStoreMBean.java    | 18 +++++
 .../io/compress/CompressionMetadata.java        |  9 +++
 .../cassandra/io/sstable/IndexSummary.java      |  9 +++
 .../cassandra/io/sstable/SSTableReader.java     | 30 +++++++++
 .../cassandra/metrics/ColumnFamilyMetrics.java  | 36 ++++++++++
 .../cassandra/metrics/KeyspaceMetrics.java      | 27 ++++++++
 .../org/apache/cassandra/tools/NodeCmd.java     | 70 +++++++++++++++-----
 .../cassandra/utils/AlwaysPresentFilter.java    |  6 ++
 .../org/apache/cassandra/utils/IFilter.java     |  6 ++
 .../cassandra/utils/Murmur3BloomFilter.java     |  6 ++
 .../org/apache/cassandra/utils/obs/IBitSet.java |  6 ++
 .../cassandra/utils/obs/OffHeapBitSet.java      |  6 ++
 .../apache/cassandra/utils/obs/OpenBitSet.java  |  6 ++
 15 files changed, 234 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 937edbb..8f4add9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Expose off-heap memory usage stats (CASSANDRA-7897)
  * Ignore Paxos commits for truncated tables (CASSANDRA-7538)
  * Validate size of indexed column values (CASSANDRA-8280)
  * Make LCS split compaction results over all data directories (CASSANDRA-8329)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 6365b4f..06520ab 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2164,6 +2164,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return metric.bloomFilterDiskSpaceUsed.value();
     }
 
+    public long getBloomFilterOffHeapMemoryUsed()
+    {
+        return metric.bloomFilterOffHeapMemoryUsed.value();
+    }
+
+    public long getIndexSummaryOffHeapMemoryUsed()
+    {
+        return metric.indexSummaryOffHeapMemoryUsed.value();
+    }
+
+    public long getCompressionMetadataOffHeapMemoryUsed()
+    {
+        return metric.compressionMetadataOffHeapMemoryUsed.value();
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 90c9f1f..53f7ba9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -230,6 +230,24 @@ public interface ColumnFamilyStoreMBean
     public long getBloomFilterDiskSpaceUsed();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#bloomFilterOffHeapMemoryUsed
+     */
+    @Deprecated
+    public long getBloomFilterOffHeapMemoryUsed();
+
+    /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#indexSummaryOffHeapMemoryUsed
+     */
+    @Deprecated
+    public long getIndexSummaryOffHeapMemoryUsed();
+
+    /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#compressionMetadataOffHeapMemoryUsed
+     */
+    @Deprecated
+    public long getCompressionMetadataOffHeapMemoryUsed();
+
+    /**
      * Gets the minimum number of sstables in queue before compaction kicks off
      */
     public int getMinimumCompactionThreshold();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index e75a7d7..231778a 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -126,6 +126,15 @@ public class CompressionMetadata
     }
 
     /**
+     * Returns the amount of memory in bytes used off heap.
+     * @return the amount of memory in bytes used off heap
+     */
+    public long offHeapSize()
+    {
+        return chunkOffsets.size();
+    }
+
+    /**
      * Read offsets of the individual chunks from the given input.
      *
      * @param input Source of the data.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index be7977e..b66071b 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -108,6 +108,15 @@ public class IndexSummary implements Closeable
         return summary_size;
     }
 
+    /**
+     * Returns the amount of memory in bytes used off heap.
+     * @return the amount of memory in bytes used off heap
+     */
+    public long offHeapSize()
+    {
+        return bytes.size();
+    }
+
     public static class IndexSummarySerializer
     {
         public void serialize(IndexSummary t, DataOutputStream out) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 87f084c..f0e9e65 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -634,6 +634,18 @@ public class SSTableReader extends SSTable implements Closeable
     }
 
     /**
+     * Returns the amount of memory in bytes used off heap by the compression meta-data.
+     * @return the amount of memory in bytes used off heap by the compression meta-data
+     */
+    public long getCompressionMetadataOffHeapSize()
+    {
+        if (!compression)
+            return 0;
+
+        return getCompressionMetadata().offHeapSize();
+    }
+
+    /**
      * For testing purposes only.
      */
     public void forceFilterFailures()
@@ -652,6 +664,24 @@ public class SSTableReader extends SSTable implements Closeable
     }
 
     /**
+     * Returns the amount of memory in bytes used off heap by the bloom filter.
+     * @return the amount of memory in bytes used off heap by the bloom filter
+     */
+    public long getBloomFilterOffHeapSize()
+    {
+        return bf.offHeapSize();
+    }
+
+    /**
+     * Returns the amount of memory in bytes used off heap by the index summary.
+     * @return the amount of memory in bytes used off heap by the index summary
+     */
+    public long getIndexSummaryOffHeapSize()
+    {
+        return indexSummary.offHeapSize();
+    }
+
+    /**
      * @return An estimate of the number of keys in this SSTable.
      */
     public long estimatedKeys()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index c2b7d61..4dbc0f8 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -87,6 +87,12 @@ public class ColumnFamilyMetrics
     public final Gauge<Double> recentBloomFilterFalseRatio;
     /** Disk space used by bloom filter */
     public final Gauge<Long> bloomFilterDiskSpaceUsed;
+    /** Off heap memory used by bloom filter */
+    public final Gauge<Long> bloomFilterOffHeapMemoryUsed;
+    /** Off heap memory used by index summary */
+    public final Gauge<Long> indexSummaryOffHeapMemoryUsed;
+    /** Off heap memory used by compression meta data*/
+    public final Gauge<Long> compressionMetadataOffHeapMemoryUsed;
     /** Key cache hit rate  for this CF */
     public final Gauge<Double> keyCacheHitRate;
     /** Tombstones scanned in queries on this CF */
@@ -429,6 +435,36 @@ public class ColumnFamilyMetrics
                 return total;
             }
         });
+        bloomFilterOffHeapMemoryUsed = createColumnFamilyGauge("BloomFilterOffHeapMemoryUsed", new Gauge<Long>()
+        {
+            public Long value()
+            {
+                long total = 0;
+                for (SSTableReader sst : cfs.getSSTables())
+                    total += sst.getBloomFilterOffHeapSize();
+                return total;
+            }
+        });
+        indexSummaryOffHeapMemoryUsed = createColumnFamilyGauge("IndexSummaryOffHeapMemoryUsed", new Gauge<Long>()
+        {
+            public Long value()
+            {
+                long total = 0;
+                for (SSTableReader sst : cfs.getSSTables())
+                    total += sst.getIndexSummaryOffHeapSize();
+                return total;
+            }
+        });
+        compressionMetadataOffHeapMemoryUsed = createColumnFamilyGauge("CompressionMetadataOffHeapMemoryUsed", new Gauge<Long>()
+        {
+            public Long value()
+            {
+                long total = 0;
+                for (SSTableReader sst : cfs.getSSTables())
+                    total += sst.getCompressionMetadataOffHeapSize();
+                return total;
+            }
+        });
         speculativeRetries = createColumnFamilyCounter("SpeculativeRetries");
         keyCacheHitRate = Metrics.newGauge(factory.createMetricName("KeyCacheHitRate"), new RatioGauge()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 0ea982e..fd89983 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -50,6 +50,12 @@ public class KeyspaceMetrics
     public final Gauge<Long> totalDiskSpaceUsed;
     /** Disk space used by bloom filter */
     public final Gauge<Long> bloomFilterDiskSpaceUsed;
+    /** Off heap memory used by bloom filter */
+    public final Gauge<Long> bloomFilterOffHeapMemoryUsed;
+    /** Off heap memory used by index summary */
+    public final Gauge<Long> indexSummaryOffHeapMemoryUsed;
+    /** Off heap memory used by compression meta data*/
+    public final Gauge<Long> compressionMetadataOffHeapMemoryUsed;
     /** (Local) read metrics */
     public final LatencyMetrics readLatency;
     /** (Local) range slice metrics */
@@ -149,6 +155,27 @@ public class KeyspaceMetrics
                 return metric.bloomFilterDiskSpaceUsed.value();
             }
         });
+        bloomFilterOffHeapMemoryUsed = createKeyspaceGauge("BloomFilterOffHeapMemoryUsed", new MetricValue()
+        {
+            public Long getValue(ColumnFamilyMetrics metric)
+            {
+                return metric.bloomFilterOffHeapMemoryUsed.value();
+            }
+        });
+        indexSummaryOffHeapMemoryUsed = createKeyspaceGauge("IndexSummaryOffHeapMemoryUsed", new MetricValue()
+        {
+            public Long getValue(ColumnFamilyMetrics metric)
+            {
+                return metric.indexSummaryOffHeapMemoryUsed.value();
+            }
+        });
+        compressionMetadataOffHeapMemoryUsed = createKeyspaceGauge("CompressionMetadataOffHeapMemoryUsed", new MetricValue()
+        {
+            public Long getValue(ColumnFamilyMetrics metric)
+            {
+                return metric.compressionMetadataOffHeapMemoryUsed.value();
+            }
+        });
         // latency metrics for ColumnFamilyMetrics to update
         readLatency = new LatencyMetrics(factory, "Read");
         writeLatency = new LatencyMetrics(factory, "Write");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 2d7809a..e4a14b2 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -635,41 +635,44 @@ public class NodeCmd
         // If there is just 1 token, print it now like we always have, otherwise,
         // require that -T/--tokens be passed (that output is potentially verbose).
         if (toks.size() == 1)
-            outs.printf("%-17s: %s%n", "Token", toks.get(0));
+            outs.printf("%-23s: %s%n", "Token", toks.get(0));
         else if (!cmd.hasOption(TOKENS_OPT.left))
-            outs.printf("%-17s: (invoke with -T/--tokens to see all %d tokens)%n", "Token", toks.size());
+            outs.printf("%-23s: (invoke with -T/--tokens to see all %d tokens)%n", "Token", toks.size());
 
-        outs.printf("%-17s: %s%n", "ID", probe.getLocalHostId());
-        outs.printf("%-17s: %s%n", "Gossip active", gossipInitialized);
-        outs.printf("%-17s: %s%n", "Thrift active", probe.isThriftServerRunning());
-        outs.printf("%-17s: %s%n", "Native Transport active", probe.isNativeTransportRunning());
-        outs.printf("%-17s: %s%n", "Load", probe.getLoadString());
+        outs.printf("%-23s: %s%n", "ID", probe.getLocalHostId());
+        outs.printf("%-23s: %s%n", "Gossip active", gossipInitialized);
+        outs.printf("%-23s: %s%n", "Thrift active", probe.isThriftServerRunning());
+        outs.printf("%-23s: %s%n", "Native Transport active", probe.isNativeTransportRunning());
+        outs.printf("%-23s: %s%n", "Load", probe.getLoadString());
         if (gossipInitialized)
-            outs.printf("%-17s: %s%n", "Generation No", probe.getCurrentGenerationNumber());
+            outs.printf("%-23s: %s%n", "Generation No", probe.getCurrentGenerationNumber());
         else
-            outs.printf("%-17s: %s%n", "Generation No", 0);
+            outs.printf("%-23s: %s%n", "Generation No", 0);
 
         // Uptime
         long secondsUp = probe.getUptime() / 1000;
-        outs.printf("%-17s: %d%n", "Uptime (seconds)", secondsUp);
+        outs.printf("%-23s: %d%n", "Uptime (seconds)", secondsUp);
 
         // Memory usage
         MemoryUsage heapUsage = probe.getHeapMemoryUsage();
         double memUsed = (double)heapUsage.getUsed() / (1024 * 1024);
         double memMax = (double)heapUsage.getMax() / (1024 * 1024);
-        outs.printf("%-17s: %.2f / %.2f%n", "Heap Memory (MB)", memUsed, memMax);
+        outs.printf("%-23s: %.2f / %.2f%n", "Heap Memory (MB)", memUsed, memMax);
+
+        // Off heap memory usage
+        outs.printf("%-23s: %.2f%n", "Off Heap Memory (MB)", getOffHeapMemoryUsed());
 
         // Data Center/Rack
-        outs.printf("%-17s: %s%n", "Data Center", probe.getDataCenter());
-        outs.printf("%-17s: %s%n", "Rack", probe.getRack());
+        outs.printf("%-23s: %s%n", "Data Center", probe.getDataCenter());
+        outs.printf("%-23s: %s%n", "Rack", probe.getRack());
 
         // Exceptions
-        outs.printf("%-17s: %s%n", "Exceptions", probe.getExceptionCount());
+        outs.printf("%-23s: %s%n", "Exceptions", probe.getExceptionCount());
 
         CacheServiceMBean cacheService = probe.getCacheServiceMBean();
 
         // Key Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
-        outs.printf("%-17s: size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
+        outs.printf("%-23s: size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
                     "Key Cache",
                     cacheService.getKeyCacheSize(),
                     cacheService.getKeyCacheCapacityInBytes(),
@@ -679,7 +682,7 @@ public class NodeCmd
                     cacheService.getKeyCacheSavePeriodInSeconds());
 
         // Row Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
-        outs.printf("%-17s: size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
+        outs.printf("%-23s: size %d (bytes), capacity %d (bytes), %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
                     "Row Cache",
                     cacheService.getRowCacheSize(),
                     cacheService.getRowCacheCapacityInBytes(),
@@ -691,8 +694,30 @@ public class NodeCmd
         if (toks.size() > 1 && cmd.hasOption(TOKENS_OPT.left))
         {
             for (String tok : toks)
-                outs.printf("%-17s: %s%n", "Token", tok);
+                outs.printf("%-23s: %s%n", "Token", tok);
+        }
+    }
+
+    /**
+     * Returns the total off heap memory used in MB.
+     * @return the total off heap memory used in MB.
+     */
+    private double getOffHeapMemoryUsed()
+    {
+        long offHeapMemUsedInBytes = 0;
+        // get a list of column family stores
+        Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> cfamilies = probe.getColumnFamilyStoreMBeanProxies();
+
+        while (cfamilies.hasNext())
+        {
+            Entry<String, ColumnFamilyStoreMBean> entry = cfamilies.next();
+            ColumnFamilyStoreMBean cfsProxy = entry.getValue();
+            offHeapMemUsedInBytes += cfsProxy.getBloomFilterOffHeapMemoryUsed();
+            offHeapMemUsedInBytes += cfsProxy.getIndexSummaryOffHeapMemoryUsed();
+            offHeapMemUsedInBytes += cfsProxy.getCompressionMetadataOffHeapMemoryUsed();
         }
+
+        return offHeapMemUsedInBytes / (1024d * 1024);
     }
 
     public void printReleaseVersion(PrintStream outs)
@@ -948,8 +973,16 @@ public class NodeCmd
                             outs.println("]");
                     }
                 }
+
+                long bloomFilterOffHeapSize = cfstore.getBloomFilterOffHeapMemoryUsed();
+                long indexSummaryOffHeapSize = cfstore.getIndexSummaryOffHeapMemoryUsed();
+                long compressionMetadataOffHeapSize = cfstore.getCompressionMetadataOffHeapMemoryUsed();
+
+                long offHeapSize = bloomFilterOffHeapSize + indexSummaryOffHeapSize + compressionMetadataOffHeapSize;
+
                 outs.println("\t\tSpace used (live), bytes: " + cfstore.getLiveDiskSpaceUsed());
                 outs.println("\t\tSpace used (total), bytes: " + cfstore.getTotalDiskSpaceUsed());
+                outs.println("\t\tOff heap memory used (total), bytes: " + offHeapSize);
                 outs.println("\t\tSSTable Compression Ratio: " + cfstore.getCompressionRatio());
                 outs.println("\t\tNumber of keys (estimate): " + cfstore.estimateKeys());
                 outs.println("\t\tMemtable cell count: " + cfstore.getMemtableColumnsCount());
@@ -963,6 +996,9 @@ public class NodeCmd
                 outs.println("\t\tBloom filter false positives: " + cfstore.getBloomFilterFalsePositives());
                 outs.println("\t\tBloom filter false ratio: " + String.format("%01.5f", cfstore.getRecentBloomFilterFalseRatio()));
                 outs.println("\t\tBloom filter space used, bytes: " + cfstore.getBloomFilterDiskSpaceUsed());
+                outs.println("\t\tBloom filter off heap memory used, bytes: " + bloomFilterOffHeapSize);
+                outs.println("\t\tIndex summary off heap memory used, bytes: " + indexSummaryOffHeapSize);
+                outs.println("\t\tCompression metadata off heap memory used, bytes: " + compressionMetadataOffHeapSize);
                 outs.println("\t\tCompacted partition minimum bytes: " + cfstore.getMinRowSize());
                 outs.println("\t\tCompacted partition maximum bytes: " + cfstore.getMaxRowSize());
                 outs.println("\t\tCompacted partition mean bytes: " + cfstore.getMeanRowSize());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 0f5136b..0886edc 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -34,4 +34,10 @@ public class AlwaysPresentFilter implements IFilter
     public void close() throws IOException { }
 
     public long serializedSize() { return 0; }
+
+    @Override
+    public long offHeapSize()
+    {
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/IFilter.java b/src/java/org/apache/cassandra/utils/IFilter.java
index 10f6df2..aed5f39 100644
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@ -29,4 +29,10 @@ public interface IFilter extends Closeable
     void clear();
 
     long serializedSize();
+
+    /**
+     * Returns the amount of memory in bytes used off heap.
+     * @return the amount of memory in bytes used off heap
+     */
+    long offHeapSize();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java b/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
index 3c2a47e..f7c7632 100644
--- a/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/Murmur3BloomFilter.java
@@ -36,6 +36,12 @@ public class Murmur3BloomFilter extends BloomFilter
         return serializer.serializedSize(this, TypeSizes.NATIVE);
     }
 
+    @Override
+    public long offHeapSize()
+    {
+        return bitset.offHeapSize();
+    }
+
     protected void hash(ByteBuffer b, int position, int remaining, long seed, long[] result)
     {
         MurmurHash.hash3_x64_128(b, b.position(), b.remaining(), seed, result);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/IBitSet.java b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
index c6fbddd..47ba492 100644
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@ -49,4 +49,10 @@ public interface IBitSet extends Closeable
     public long serializedSize(TypeSizes type);
 
     public void clear();
+
+    /**
+     * Returns the amount of memory in bytes used off heap.
+     * @return the amount of memory in bytes used off heap
+     */
+    public long offHeapSize();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index 29dd848..5063d80 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -62,6 +62,12 @@ public class OffHeapBitSet implements IBitSet
         return bytes.size() * 8;
     }
 
+    @Override
+    public long offHeapSize()
+    {
+        return bytes.size();
+    }
+
     public boolean get(long index)
     {
         long i = index >> 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a14a77f/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
index b5310fa..3e1efce 100644
--- a/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
@@ -104,6 +104,12 @@ public class OpenBitSet implements IBitSet
   /** Returns the current capacity in bits (1 greater than the index of the last bit) */
   public long capacity() { return ((long)wlen) << 6; }
 
+  @Override
+  public long offHeapSize()
+  {
+      return 0;
+  }
+
  /**
   * Returns the current capacity of this set.  Included for
   * compatibility.  This is *not* equal to {@link #cardinality}


[3/5] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeTool.java
index 1db0245,0000000..fe4535b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@@ -1,2476 -1,0 +1,2516 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.tools;
 +
 +import java.io.*;
 +import java.lang.management.MemoryUsage;
 +import java.net.InetAddress;
 +import java.net.UnknownHostException;
 +import java.text.DecimalFormat;
 +import java.text.SimpleDateFormat;
 +import java.util.*;
 +import java.util.Map.Entry;
 +import java.util.concurrent.ExecutionException;
 +
 +import javax.management.openmbean.TabularData;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.base.Throwables;
 +import com.google.common.collect.ArrayListMultimap;
 +import com.google.common.collect.LinkedHashMultimap;
 +import com.google.common.collect.Maps;
 +import com.yammer.metrics.reporting.JmxReporter;
 +
 +import io.airlift.command.*;
 +
 +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.compaction.CompactionManagerMBean;
 +import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
 +import org.apache.cassandra.locator.LocalStrategy;
 +import org.apache.cassandra.net.MessagingServiceMBean;
 +import org.apache.cassandra.repair.RepairParallelism;
 +import org.apache.cassandra.service.CacheServiceMBean;
 +import org.apache.cassandra.streaming.ProgressInfo;
 +import org.apache.cassandra.streaming.SessionInfo;
 +import org.apache.cassandra.streaming.StreamState;
 +import org.apache.cassandra.utils.EstimatedHistogram;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +
 +import static com.google.common.base.Preconditions.checkArgument;
 +import static com.google.common.base.Preconditions.checkState;
 +import static com.google.common.base.Throwables.getStackTraceAsString;
 +import static com.google.common.collect.Iterables.toArray;
 +import static com.google.common.collect.Lists.newArrayList;
 +import static java.lang.Integer.parseInt;
 +import static java.lang.String.format;
 +import static org.apache.commons.lang3.ArrayUtils.EMPTY_STRING_ARRAY;
 +import static org.apache.commons.lang3.StringUtils.*;
 +
 +public class NodeTool
 +{
 +    private static final String HISTORYFILE = "nodetool.history";
 +
 +    public static void main(String... args)
 +    {
 +        List<Class<? extends Runnable>> commands = newArrayList(
 +                Help.class,
 +                Info.class,
 +                Ring.class,
 +                NetStats.class,
 +                CfStats.class,
 +                CfHistograms.class,
 +                Cleanup.class,
 +                ClearSnapshot.class,
 +                Compact.class,
 +                Scrub.class,
 +                Flush.class,
 +                UpgradeSSTable.class,
 +                DisableAutoCompaction.class,
 +                EnableAutoCompaction.class,
 +                CompactionStats.class,
 +                CompactionHistory.class,
 +                Decommission.class,
 +                DescribeCluster.class,
 +                DisableBinary.class,
 +                EnableBinary.class,
 +                EnableGossip.class,
 +                DisableGossip.class,
 +                EnableHandoff.class,
 +                EnableThrift.class,
 +                GcStats.class,
 +                GetCompactionThreshold.class,
 +                GetCompactionThroughput.class,
 +                GetStreamThroughput.class,
 +                GetEndpoints.class,
 +                GetSSTables.class,
 +                GossipInfo.class,
 +                InvalidateKeyCache.class,
 +                InvalidateRowCache.class,
 +                InvalidateCounterCache.class,
 +                Join.class,
 +                Move.class,
 +                PauseHandoff.class,
 +                ResumeHandoff.class,
 +                ProxyHistograms.class,
 +                Rebuild.class,
 +                Refresh.class,
 +                RemoveToken.class,
 +                RemoveNode.class,
 +                Repair.class,
 +                SetCacheCapacity.class,
 +                SetHintedHandoffThrottleInKB.class,
 +                SetCompactionThreshold.class,
 +                SetCompactionThroughput.class,
 +                SetStreamThroughput.class,
 +                SetTraceProbability.class,
 +                Snapshot.class,
 +                ListSnapshots.class,
 +                Status.class,
 +                StatusBinary.class,
 +                StatusGossip.class,
 +                StatusThrift.class,
 +                Stop.class,
 +                StopDaemon.class,
 +                Version.class,
 +                DescribeRing.class,
 +                RebuildIndex.class,
 +                RangeKeySample.class,
 +                EnableBackup.class,
 +                DisableBackup.class,
 +                ResetLocalSchema.class,
 +                ReloadTriggers.class,
 +                SetCacheKeysToSave.class,
 +                DisableThrift.class,
 +                DisableHandoff.class,
 +                Drain.class,
 +                TruncateHints.class,
 +                TpStats.class,
 +                SetLoggingLevel.class,
 +                GetLoggingLevels.class
 +        );
 +
 +        Cli<Runnable> parser = Cli.<Runnable>builder("nodetool")
 +                .withDescription("Manage your Cassandra cluster")
 +                .withDefaultCommand(Help.class)
 +                .withCommands(commands)
 +                .build();
 +
 +        int status = 0;
 +        try
 +        {
 +            Runnable parse = parser.parse(args);
 +            printHistory(args);
 +            parse.run();
 +        } catch (IllegalArgumentException |
 +                IllegalStateException |
 +                ParseArgumentsMissingException |
 +                ParseArgumentsUnexpectedException |
 +                ParseOptionConversionException |
 +                ParseOptionMissingException |
 +                ParseOptionMissingValueException |
 +                ParseCommandMissingException |
 +                ParseCommandUnrecognizedException e)
 +        {
 +            badUse(e);
 +            status = 1;
 +        } catch (Throwable throwable)
 +        {
 +            err(Throwables.getRootCause(throwable));
 +            status = 2;
 +        }
 +
 +        System.exit(status);
 +    }
 +
 +    private static void printHistory(String... args)
 +    {
 +        //don't bother to print if no args passed (meaning, nodetool is just printing out the sub-commands list)
 +        if (args.length == 0)
 +            return;
 +
 +        String cmdLine = Joiner.on(" ").skipNulls().join(args);
 +        cmdLine = cmdLine.replaceFirst("(?<=(-pw|--password))\\s+\\S+", " <hidden>");
 +
 +        try (FileWriter writer = new FileWriter(new File(FBUtilities.getToolsOutputDirectory(), HISTORYFILE), true))
 +        {
 +            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
 +            writer.append(sdf.format(new Date())).append(": ").append(cmdLine).append(System.lineSeparator());
 +        }
 +        catch (IOException | IOError ioe)
 +        {
 +            //quietly ignore any errors about not being able to write out history
 +        }
 +    }
 +
 +    private static void badUse(Exception e)
 +    {
 +        System.out.println("nodetool: " + e.getMessage());
 +        System.out.println("See 'nodetool help' or 'nodetool help <command>'.");
 +    }
 +
 +    private static void err(Throwable e)
 +    {
 +        System.err.println("error: " + e.getMessage());
 +        System.err.println("-- StackTrace --");
 +        System.err.println(getStackTraceAsString(e));
 +    }
 +
 +    public static abstract class NodeToolCmd implements Runnable
 +    {
 +
 +        @Option(type = OptionType.GLOBAL, name = {"-h", "--host"}, description = "Node hostname or ip address")
 +        private String host = "127.0.0.1";
 +
 +        @Option(type = OptionType.GLOBAL, name = {"-p", "--port"}, description = "Remote jmx agent port number")
 +        private String port = "7199";
 +
 +        @Option(type = OptionType.GLOBAL, name = {"-u", "--username"}, description = "Remote jmx agent username")
 +        private String username = EMPTY;
 +
 +        @Option(type = OptionType.GLOBAL, name = {"-pw", "--password"}, description = "Remote jmx agent password")
 +        private String password = EMPTY;
 +
 +        @Option(type = OptionType.GLOBAL, name = {"-pwf", "--password-file"}, description = "Path to the JMX password file")
 +        private String passwordFilePath = EMPTY;
 +
 +        @Override
 +        public void run()
 +        {
 +            if (isNotEmpty(username)) {
 +                if (isNotEmpty(passwordFilePath))
 +                    password = readUserPasswordFromFile(username, passwordFilePath);
 +
 +                if (isEmpty(password))
 +                    password = promptAndReadPassword();
 +            }
 +
 +            try (NodeProbe probe = connect())
 +            {
 +                execute(probe);
 +            } 
 +            catch (IOException e)
 +            {
 +                throw new RuntimeException("Error while closing JMX connection", e);
 +            }
 +
 +        }
 +
 +        private String readUserPasswordFromFile(String username, String passwordFilePath) {
 +            String password = EMPTY;
 +
 +            File passwordFile = new File(passwordFilePath);
 +            try (Scanner scanner = new Scanner(passwordFile).useDelimiter("\\s+"))
 +            {
 +                while (scanner.hasNextLine())
 +                {
 +                    if (scanner.hasNext())
 +                    {
 +                        String jmxRole = scanner.next();
 +                        if (jmxRole.equals(username) && scanner.hasNext())
 +                        {
 +                            password = scanner.next();
 +                            break;
 +                        }
 +                    }
 +                    scanner.nextLine();
 +                }
 +            } catch (FileNotFoundException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +
 +            return password;
 +        }
 +
 +        private String promptAndReadPassword()
 +        {
 +            String password = EMPTY;
 +
 +            Console console = System.console();
 +            if (console != null)
 +                password = String.valueOf(console.readPassword("Password:"));
 +
 +            return password;
 +        }
 +
 +        protected abstract void execute(NodeProbe probe);
 +
 +        private NodeProbe connect()
 +        {
 +            NodeProbe nodeClient = null;
 +
 +            try
 +            {
 +                if (username.isEmpty())
 +                    nodeClient = new NodeProbe(host, parseInt(port));
 +                else
 +                    nodeClient = new NodeProbe(host, parseInt(port), username, password);
 +            } catch (IOException e)
 +            {
 +                Throwable rootCause = Throwables.getRootCause(e);
 +                System.err.println(format("nodetool: Failed to connect to '%s:%s' - %s: '%s'.", host, port, rootCause.getClass().getSimpleName(), rootCause.getMessage()));
 +                System.exit(1);
 +            }
 +
 +            return nodeClient;
 +        }
 +
 +        protected List<String> parseOptionalKeyspace(List<String> cmdArgs, NodeProbe nodeProbe)
 +        {
 +            List<String> keyspaces = new ArrayList<>();
 +
 +            if (cmdArgs == null || cmdArgs.isEmpty())
 +                keyspaces.addAll(nodeProbe.getKeyspaces());
 +            else
 +                keyspaces.add(cmdArgs.get(0));
 +
 +            for (String keyspace : keyspaces)
 +            {
 +                if (!nodeProbe.getKeyspaces().contains(keyspace))
 +                    throw new IllegalArgumentException("Keyspace [" + keyspace + "] does not exist.");
 +            }
 +
 +            return Collections.unmodifiableList(keyspaces);
 +        }
 +
 +        protected String[] parseOptionalColumnFamilies(List<String> cmdArgs)
 +        {
 +            return cmdArgs.size() <= 1 ? EMPTY_STRING_ARRAY : toArray(cmdArgs.subList(1, cmdArgs.size()), String.class);
 +        }
 +    }
 +
 +    @Command(name = "info", description = "Print node information (uptime, load, ...)")
 +    public static class Info extends NodeToolCmd
 +    {
 +        @Option(name = {"-T", "--tokens"}, description = "Display all tokens")
 +        private boolean tokens = false;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            boolean gossipInitialized = probe.isInitialized();
 +
-             System.out.printf("%-17s: %s%n", "ID", probe.getLocalHostId());
-             System.out.printf("%-17s: %s%n", "Gossip active", gossipInitialized);
-             System.out.printf("%-17s: %s%n", "Thrift active", probe.isThriftServerRunning());
-             System.out.printf("%-17s: %s%n", "Native Transport active", probe.isNativeTransportRunning());
-             System.out.printf("%-17s: %s%n", "Load", probe.getLoadString());
++            System.out.printf("%-23s: %s%n", "ID", probe.getLocalHostId());
++            System.out.printf("%-23s: %s%n", "Gossip active", gossipInitialized);
++            System.out.printf("%-23s: %s%n", "Thrift active", probe.isThriftServerRunning());
++            System.out.printf("%-23s: %s%n", "Native Transport active", probe.isNativeTransportRunning());
++            System.out.printf("%-23s: %s%n", "Load", probe.getLoadString());
 +            if (gossipInitialized)
-                 System.out.printf("%-17s: %s%n", "Generation No", probe.getCurrentGenerationNumber());
++                System.out.printf("%-23s: %s%n", "Generation No", probe.getCurrentGenerationNumber());
 +            else
-                 System.out.printf("%-17s: %s%n", "Generation No", 0);
++                System.out.printf("%-23s: %s%n", "Generation No", 0);
 +
 +            // Uptime
 +            long secondsUp = probe.getUptime() / 1000;
-             System.out.printf("%-17s: %d%n", "Uptime (seconds)", secondsUp);
++            System.out.printf("%-23s: %d%n", "Uptime (seconds)", secondsUp);
 +
 +            // Memory usage
 +            MemoryUsage heapUsage = probe.getHeapMemoryUsage();
 +            double memUsed = (double) heapUsage.getUsed() / (1024 * 1024);
 +            double memMax = (double) heapUsage.getMax() / (1024 * 1024);
-             System.out.printf("%-17s: %.2f / %.2f%n", "Heap Memory (MB)", memUsed, memMax);
++            System.out.printf("%-23s: %.2f / %.2f%n", "Heap Memory (MB)", memUsed, memMax);
++            System.out.printf("%-23s: %.2f%n", "Off Heap Memory (MB)", getOffHeapMemoryUsed(probe));
 +
 +            // Data Center/Rack
-             System.out.printf("%-17s: %s%n", "Data Center", probe.getDataCenter());
-             System.out.printf("%-17s: %s%n", "Rack", probe.getRack());
++            System.out.printf("%-23s: %s%n", "Data Center", probe.getDataCenter());
++            System.out.printf("%-23s: %s%n", "Rack", probe.getRack());
 +
 +            // Exceptions
-             System.out.printf("%-17s: %s%n", "Exceptions", probe.getStorageMetric("Exceptions"));
++            System.out.printf("%-23s: %s%n", "Exceptions", probe.getStorageMetric("Exceptions"));
 +
 +            CacheServiceMBean cacheService = probe.getCacheServiceMBean();
 +
 +            // Key Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
-             System.out.printf("%-17s: entries %d, size %s, capacity %s, %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
++            System.out.printf("%-23s: entries %d, size %s, capacity %s, %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
 +                    "Key Cache",
 +                    probe.getCacheMetric("KeyCache", "Entries"),
 +                    FileUtils.stringifyFileSize((long) probe.getCacheMetric("KeyCache", "Size")),
 +                    FileUtils.stringifyFileSize((long) probe.getCacheMetric("KeyCache", "Capacity")),
 +                    probe.getCacheMetric("KeyCache", "Hits"),
 +                    probe.getCacheMetric("KeyCache", "Requests"),
 +                    probe.getCacheMetric("KeyCache", "HitRate"),
 +                    cacheService.getKeyCacheSavePeriodInSeconds());
 +
 +            // Row Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
-             System.out.printf("%-17s: entries %d, size %s, capacity %s, %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
++            System.out.printf("%-23s: entries %d, size %s, capacity %s, %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
 +                    "Row Cache",
 +                    probe.getCacheMetric("RowCache", "Entries"),
 +                    FileUtils.stringifyFileSize((long) probe.getCacheMetric("RowCache", "Size")),
 +                    FileUtils.stringifyFileSize((long) probe.getCacheMetric("RowCache", "Capacity")),
 +                    probe.getCacheMetric("RowCache", "Hits"),
 +                    probe.getCacheMetric("RowCache", "Requests"),
 +                    probe.getCacheMetric("RowCache", "HitRate"),
 +                    cacheService.getRowCacheSavePeriodInSeconds());
 +
 +            // Counter Cache: Hits, Requests, RecentHitRate, SavePeriodInSeconds
-             System.out.printf("%-17s: entries %d, size %s, capacity %s, %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
++            System.out.printf("%-23s: entries %d, size %s, capacity %s, %d hits, %d requests, %.3f recent hit rate, %d save period in seconds%n",
 +                    "Counter Cache",
 +                    probe.getCacheMetric("CounterCache", "Entries"),
 +                    FileUtils.stringifyFileSize((long) probe.getCacheMetric("CounterCache", "Size")),
 +                    FileUtils.stringifyFileSize((long) probe.getCacheMetric("CounterCache", "Capacity")),
 +                    probe.getCacheMetric("CounterCache", "Hits"),
 +                    probe.getCacheMetric("CounterCache", "Requests"),
 +                    probe.getCacheMetric("CounterCache", "HitRate"),
 +                    cacheService.getCounterCacheSavePeriodInSeconds());
 +
 +            // Tokens
 +            List<String> tokens = probe.getTokens();
 +            if (tokens.size() == 1 || this.tokens)
 +                for (String token : tokens)
-                     System.out.printf("%-17s: %s%n", "Token", token);
++                    System.out.printf("%-23s: %s%n", "Token", token);
 +            else
-                 System.out.printf("%-17s: (invoke with -T/--tokens to see all %d tokens)%n", "Token", tokens.size());
++                System.out.printf("%-23s: (invoke with -T/--tokens to see all %d tokens)%n", "Token", tokens.size());
++        }
++
++        /**
++         * Returns the total off heap memory used in MB.
++         * @return the total off heap memory used in MB.
++         */
++        private static double getOffHeapMemoryUsed(NodeProbe probe)
++        {
++            long offHeapMemUsedInBytes = 0;
++            // get a list of column family stores
++            Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> cfamilies = probe.getColumnFamilyStoreMBeanProxies();
++
++            while (cfamilies.hasNext())
++            {
++                Entry<String, ColumnFamilyStoreMBean> entry = cfamilies.next();
++                String keyspaceName = entry.getKey();
++                String cfName = entry.getValue().getColumnFamilyName();
++
++                offHeapMemUsedInBytes += (Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableOffHeapSize");
++                offHeapMemUsedInBytes += (Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterOffHeapMemoryUsed");
++                offHeapMemUsedInBytes += (Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "IndexSummaryOffHeapMemoryUsed");
++                offHeapMemUsedInBytes += (Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "CompressionMetadataOffHeapMemoryUsed");
++            }
++
++            return offHeapMemUsedInBytes / (1024d * 1024);
 +        }
 +    }
 +
 +    @Command(name = "ring", description = "Print information about the token ring")
 +    public static class Ring extends NodeToolCmd
 +    {
 +        @Arguments(description = "Specify a keyspace for accurate ownership information (topology awareness)")
 +        private String keyspace = null;
 +
 +        @Option(title = "resolve_ip", name = {"-r", "--resolve-ip"}, description = "Show node domain names instead of IPs")
 +        private boolean resolveIp = false;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            Map<String, String> tokensToEndpoints = probe.getTokenToEndpointMap();
 +            LinkedHashMultimap<String, String> endpointsToTokens = LinkedHashMultimap.create();
 +            boolean haveVnodes = false;
 +            for (Map.Entry<String, String> entry : tokensToEndpoints.entrySet())
 +            {
 +                haveVnodes |= endpointsToTokens.containsKey(entry.getValue());
 +                endpointsToTokens.put(entry.getValue(), entry.getKey());
 +            }
 +
 +            int maxAddressLength = Collections.max(endpointsToTokens.keys(), new Comparator<String>()
 +            {
 +                @Override
 +                public int compare(String first, String second)
 +                {
 +                    return ((Integer) first.length()).compareTo(second.length());
 +                }
 +            }).length();
 +
 +            String formatPlaceholder = "%%-%ds  %%-12s%%-7s%%-8s%%-16s%%-20s%%-44s%%n";
 +            String format = format(formatPlaceholder, maxAddressLength);
 +
 +            StringBuffer errors = new StringBuffer();
 +            boolean showEffectiveOwnership = true;
 +            // Calculate per-token ownership of the ring
 +            Map<InetAddress, Float> ownerships;
 +            try
 +            {
 +                ownerships = probe.effectiveOwnership(keyspace);
 +            } 
 +            catch (IllegalStateException ex)
 +            {
 +                ownerships = probe.getOwnership();
 +                errors.append("Note: " + ex.getMessage() + "%n");
 +                showEffectiveOwnership = false;
 +            } 
 +            catch (IllegalArgumentException ex)
 +            {
 +                System.out.printf("%nError: " + ex.getMessage() + "%n");
 +                return;
 +            }
 +
 +            
 +            System.out.println();
 +            for (Entry<String, SetHostStat> entry : getOwnershipByDc(probe, resolveIp, tokensToEndpoints, ownerships).entrySet())
 +                printDc(probe, format, entry.getKey(), endpointsToTokens, entry.getValue(),showEffectiveOwnership);
 +
 +            if (haveVnodes)
 +            {
 +                System.out.println("  Warning: \"nodetool ring\" is used to output all the tokens of a node.");
 +                System.out.println("  To view status related info of a node use \"nodetool status\" instead.\n");
 +            }
 +
 +            System.out.printf("%n  " + errors.toString());
 +        }
 +
 +        private void printDc(NodeProbe probe, String format,
 +                             String dc,
 +                             LinkedHashMultimap<String, String> endpointsToTokens,
 +                             SetHostStat hoststats,boolean showEffectiveOwnership)
 +        {
 +            Collection<String> liveNodes = probe.getLiveNodes();
 +            Collection<String> deadNodes = probe.getUnreachableNodes();
 +            Collection<String> joiningNodes = probe.getJoiningNodes();
 +            Collection<String> leavingNodes = probe.getLeavingNodes();
 +            Collection<String> movingNodes = probe.getMovingNodes();
 +            Map<String, String> loadMap = probe.getLoadMap();
 +
 +            System.out.println("Datacenter: " + dc);
 +            System.out.println("==========");
 +
 +            // get the total amount of replicas for this dc and the last token in this dc's ring
 +            List<String> tokens = new ArrayList<>();
 +            String lastToken = "";
 +
 +            for (HostStat stat : hoststats)
 +            {
 +                tokens.addAll(endpointsToTokens.get(stat.endpoint.getHostAddress()));
 +                lastToken = tokens.get(tokens.size() - 1);
 +            }
 +
 +            System.out.printf(format, "Address", "Rack", "Status", "State", "Load", "Owns", "Token");
 +
 +            if (hoststats.size() > 1)
 +                System.out.printf(format, "", "", "", "", "", "", lastToken);
 +            else
 +                System.out.println();
 +
 +            for (HostStat stat : hoststats)
 +            {
 +                String endpoint = stat.endpoint.getHostAddress();
 +                String rack;
 +                try
 +                {
 +                    rack = probe.getEndpointSnitchInfoProxy().getRack(endpoint);
 +                }
 +                catch (UnknownHostException e)
 +                {
 +                    rack = "Unknown";
 +                }
 +
 +                String status = liveNodes.contains(endpoint)
 +                        ? "Up"
 +                        : deadNodes.contains(endpoint)
 +                                ? "Down"
 +                                : "?";
 +
 +                String state = "Normal";
 +
 +                if (joiningNodes.contains(endpoint))
 +                    state = "Joining";
 +                else if (leavingNodes.contains(endpoint))
 +                    state = "Leaving";
 +                else if (movingNodes.contains(endpoint))
 +                    state = "Moving";
 +
 +                String load = loadMap.containsKey(endpoint)
 +                        ? loadMap.get(endpoint)
 +                        : "?";
 +                String owns = stat.owns != null && showEffectiveOwnership? new DecimalFormat("##0.00%").format(stat.owns) : "?";
 +                System.out.printf(format, stat.ipOrDns(), rack, status, state, load, owns, stat.token);
 +            }
 +            System.out.println();
 +        }
 +    }
 +
 +    @Command(name = "netstats", description = "Print network information on provided host (connecting node by default)")
 +    public static class NetStats extends NodeToolCmd
 +    {
 +        @Option(title = "human_readable",
 +                name = {"-H", "--human-readable"},
 +                description = "Display bytes in human readable form, i.e. KB, MB, GB, TB")
 +        private boolean humanReadable = false;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            System.out.printf("Mode: %s%n", probe.getOperationMode());
 +            Set<StreamState> statuses = probe.getStreamStatus();
 +            if (statuses.isEmpty())
 +                System.out.println("Not sending any streams.");
 +            for (StreamState status : statuses)
 +            {
 +                System.out.printf("%s %s%n", status.description, status.planId.toString());
 +                for (SessionInfo info : status.sessions)
 +                {
 +                    System.out.printf("    %s", info.peer.toString());
 +                    // print private IP when it is used
 +                    if (!info.peer.equals(info.connecting))
 +                    {
 +                        System.out.printf(" (using %s)", info.connecting.toString());
 +                    }
 +                    System.out.printf("%n");
 +                    if (!info.receivingSummaries.isEmpty())
 +                    {
 +                        if (humanReadable)
 +                            System.out.printf("        Receiving %d files, %s total%n", info.getTotalFilesToReceive(), FileUtils.stringifyFileSize(info.getTotalSizeToReceive()));
 +                        else
 +                            System.out.printf("        Receiving %d files, %d bytes total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive());
 +                        for (ProgressInfo progress : info.getReceivingFiles())
 +                        {
 +                            System.out.printf("            %s%n", progress.toString());
 +                        }
 +                    }
 +                    if (!info.sendingSummaries.isEmpty())
 +                    {
 +                        if (humanReadable)
 +                            System.out.printf("        Sending %d files, %s total%n", info.getTotalFilesToSend(), FileUtils.stringifyFileSize(info.getTotalSizeToSend()));
 +                        else
 +                            System.out.printf("        Sending %d files, %d bytes total%n", info.getTotalFilesToSend(), info.getTotalSizeToSend());
 +                        for (ProgressInfo progress : info.getSendingFiles())
 +                        {
 +                            System.out.printf("            %s%n", progress.toString());
 +                        }
 +                    }
 +                }
 +            }
 +
 +            System.out.printf("Read Repair Statistics:%nAttempted: %d%nMismatch (Blocking): %d%nMismatch (Background): %d%n", probe.getReadRepairAttempted(), probe.getReadRepairRepairedBlocking(), probe.getReadRepairRepairedBackground());
 +
 +            MessagingServiceMBean ms = probe.msProxy;
 +            System.out.printf("%-25s", "Pool Name");
 +            System.out.printf("%10s", "Active");
 +            System.out.printf("%10s", "Pending");
 +            System.out.printf("%15s%n", "Completed");
 +
 +            int pending;
 +            long completed;
 +
 +            pending = 0;
 +            for (int n : ms.getCommandPendingTasks().values())
 +                pending += n;
 +            completed = 0;
 +            for (long n : ms.getCommandCompletedTasks().values())
 +                completed += n;
 +            System.out.printf("%-25s%10s%10s%15s%n", "Commands", "n/a", pending, completed);
 +
 +            pending = 0;
 +            for (int n : ms.getResponsePendingTasks().values())
 +                pending += n;
 +            completed = 0;
 +            for (long n : ms.getResponseCompletedTasks().values())
 +                completed += n;
 +            System.out.printf("%-25s%10s%10s%15s%n", "Responses", "n/a", pending, completed);
 +        }
 +    }
 +
 +    @Command(name = "cfstats", description = "Print statistics on column families")
 +    public static class CfStats extends NodeToolCmd
 +    {
 +        @Arguments(usage = "[<keyspace.cfname>...]", description = "List of column families (or keyspace) names")
 +        private List<String> cfnames = new ArrayList<>();
 +
 +        @Option(name = "-i", description = "Ignore the list of column families and display the remaining cfs")
 +        private boolean ignore = false;
 +
 +        @Option(title = "human_readable",
 +                name = {"-H", "--human-readable"},
 +                description = "Display bytes in human readable form, i.e. KB, MB, GB, TB")
 +        private boolean humanReadable = false;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            OptionFilter filter = new OptionFilter(ignore, cfnames);
 +            Map<String, List<ColumnFamilyStoreMBean>> cfstoreMap = new HashMap<>();
 +
 +            // get a list of column family stores
 +            Iterator<Map.Entry<String, ColumnFamilyStoreMBean>> cfamilies = probe.getColumnFamilyStoreMBeanProxies();
 +
 +            while (cfamilies.hasNext())
 +            {
 +                Map.Entry<String, ColumnFamilyStoreMBean> entry = cfamilies.next();
 +                String keyspaceName = entry.getKey();
 +                ColumnFamilyStoreMBean cfsProxy = entry.getValue();
 +
 +                if (!cfstoreMap.containsKey(keyspaceName) && filter.isColumnFamilyIncluded(entry.getKey(), cfsProxy.getColumnFamilyName()))
 +                {
 +                    List<ColumnFamilyStoreMBean> columnFamilies = new ArrayList<>();
 +                    columnFamilies.add(cfsProxy);
 +                    cfstoreMap.put(keyspaceName, columnFamilies);
 +                } else if (filter.isColumnFamilyIncluded(entry.getKey(), cfsProxy.getColumnFamilyName()))
 +                {
 +                    cfstoreMap.get(keyspaceName).add(cfsProxy);
 +                }
 +            }
 +
 +            // make sure all specified kss and cfs exist
 +            filter.verifyKeyspaces(probe.getKeyspaces());
 +            filter.verifyColumnFamilies();
 +
 +            // print out the table statistics
 +            for (Map.Entry<String, List<ColumnFamilyStoreMBean>> entry : cfstoreMap.entrySet())
 +            {
 +                String keyspaceName = entry.getKey();
 +                List<ColumnFamilyStoreMBean> columnFamilies = entry.getValue();
 +                long keyspaceReadCount = 0;
 +                long keyspaceWriteCount = 0;
 +                int keyspacePendingFlushes = 0;
 +                double keyspaceTotalReadTime = 0.0f;
 +                double keyspaceTotalWriteTime = 0.0f;
 +
 +                System.out.println("Keyspace: " + keyspaceName);
 +                for (ColumnFamilyStoreMBean cfstore : columnFamilies)
 +                {
 +                    String cfName = cfstore.getColumnFamilyName();
 +                    long writeCount = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getCount();
 +                    long readCount = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getCount();
 +
 +                    if (readCount > 0)
 +                    {
 +                        keyspaceReadCount += readCount;
 +                        keyspaceTotalReadTime += (long) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadTotalLatency");
 +                    }
 +                    if (writeCount > 0)
 +                    {
 +                        keyspaceWriteCount += writeCount;
 +                        keyspaceTotalWriteTime += (long) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteTotalLatency");
 +                    }
 +                    keyspacePendingFlushes += (long) probe.getColumnFamilyMetric(keyspaceName, cfName, "PendingFlushes");
 +                }
 +
 +                double keyspaceReadLatency = keyspaceReadCount > 0
 +                                             ? keyspaceTotalReadTime / keyspaceReadCount / 1000
 +                                             : Double.NaN;
 +                double keyspaceWriteLatency = keyspaceWriteCount > 0
 +                                              ? keyspaceTotalWriteTime / keyspaceWriteCount / 1000
 +                                              : Double.NaN;
 +
 +                System.out.println("\tRead Count: " + keyspaceReadCount);
 +                System.out.println("\tRead Latency: " + String.format("%s", keyspaceReadLatency) + " ms.");
 +                System.out.println("\tWrite Count: " + keyspaceWriteCount);
 +                System.out.println("\tWrite Latency: " + String.format("%s", keyspaceWriteLatency) + " ms.");
 +                System.out.println("\tPending Flushes: " + keyspacePendingFlushes);
 +
 +                // print out column family statistics for this keyspace
 +                for (ColumnFamilyStoreMBean cfstore : columnFamilies)
 +                {
 +                    String cfName = cfstore.getColumnFamilyName();
 +                    if (cfName.contains("."))
 +                        System.out.println("\t\tTable (index): " + cfName);
 +                    else
 +                        System.out.println("\t\tTable: " + cfName);
 +
 +                    System.out.println("\t\tSSTable count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveSSTableCount"));
 +
 +                    int[] leveledSStables = cfstore.getSSTableCountPerLevel();
 +                    if (leveledSStables != null)
 +                    {
 +                        System.out.print("\t\tSSTables in each level: [");
 +                        for (int level = 0; level < leveledSStables.length; level++)
 +                        {
 +                            int count = leveledSStables[level];
 +                            System.out.print(count);
 +                            long maxCount = 4L; // for L0
 +                            if (level > 0)
 +                                maxCount = (long) Math.pow(10, level);
 +                            //  show max threshold for level when exceeded
 +                            if (count > maxCount)
 +                                System.out.print("/" + maxCount);
 +
 +                            if (level < leveledSStables.length - 1)
 +                                System.out.print(", ");
 +                            else
 +                                System.out.println("]");
 +                        }
 +                    }
++
++                    Long memtableOffHeapSize = (Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableOffHeapSize");
++                    Long bloomFilterOffHeapSize = (Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterOffHeapMemoryUsed");
++                    Long indexSummaryOffHeapSize = (Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "IndexSummaryOffHeapMemoryUsed");
++                    Long compressionMetadataOffHeapSize = (Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "CompressionMetadataOffHeapMemoryUsed");
++
++                    Long offHeapSize = memtableOffHeapSize + bloomFilterOffHeapSize + indexSummaryOffHeapSize + compressionMetadataOffHeapSize;
++
 +                    System.out.println("\t\tSpace used (live): " + format((Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveDiskSpaceUsed"), humanReadable));
 +                    System.out.println("\t\tSpace used (total): " + format((Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "TotalDiskSpaceUsed"), humanReadable));
 +                    System.out.println("\t\tSpace used by snapshots (total): " + format((Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "SnapshotsSize"), humanReadable));
++                    System.out.println("\t\tOff heap memory used (total): " + format(offHeapSize, humanReadable));
 +                    System.out.println("\t\tSSTable Compression Ratio: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "CompressionRatio"));
 +                    System.out.println("\t\tMemtable cell count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableColumnsCount"));
 +                    System.out.println("\t\tMemtable data size: " + format((Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableLiveDataSize"), humanReadable));
++                    System.out.println("\t\tMemtable off heap memory used: " + format(memtableOffHeapSize, humanReadable));
 +                    System.out.println("\t\tMemtable switch count: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "MemtableSwitchCount"));
 +                    System.out.println("\t\tLocal read count: " + ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getCount());
 +                    double localReadLatency = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "ReadLatency")).getMean() / 1000;
 +                    double localRLatency = localReadLatency > 0 ? localReadLatency : Double.NaN;
 +                    System.out.printf("\t\tLocal read latency: %01.3f ms%n", localRLatency);
 +                    System.out.println("\t\tLocal write count: " + ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getCount());
 +                    double localWriteLatency = ((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "WriteLatency")).getMean() / 1000;
 +                    double localWLatency = localWriteLatency > 0 ? localWriteLatency : Double.NaN;
 +                    System.out.printf("\t\tLocal write latency: %01.3f ms%n", localWLatency);
 +                    System.out.println("\t\tPending flushes: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "PendingFlushes"));
 +                    System.out.println("\t\tBloom filter false positives: " + probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterFalsePositives"));
 +                    System.out.printf("\t\tBloom filter false ratio: %s%n", String.format("%01.5f", probe.getColumnFamilyMetric(keyspaceName, cfName, "RecentBloomFilterFalseRatio")));
 +                    System.out.println("\t\tBloom filter space used: " + format((Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "BloomFilterDiskSpaceUsed"), humanReadable));
++                    System.out.println("\t\tBloom filter off heap memory used: " + format(bloomFilterOffHeapSize, humanReadable));
++                    System.out.println("\t\tIndex summary off heap memory used: " + format(indexSummaryOffHeapSize, humanReadable));
++                    System.out.println("\t\tCompression metadata off heap memory used: " + format(compressionMetadataOffHeapSize, humanReadable));
++
 +                    System.out.println("\t\tCompacted partition minimum bytes: " + format((Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "MinRowSize"), humanReadable));
 +                    System.out.println("\t\tCompacted partition maximum bytes: " + format((Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "MaxRowSize"), humanReadable));
 +                    System.out.println("\t\tCompacted partition mean bytes: " + format((Long) probe.getColumnFamilyMetric(keyspaceName, cfName, "MeanRowSize"), humanReadable));
 +                    JmxReporter.HistogramMBean histogram = (JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "LiveScannedHistogram");
 +                    System.out.println("\t\tAverage live cells per slice (last five minutes): " + histogram.getMean());
 +                    System.out.println("\t\tMaximum live cells per slice (last five minutes): " + histogram.getMax());
 +                    histogram = (JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspaceName, cfName, "TombstoneScannedHistogram");
 +                    System.out.println("\t\tAverage tombstones per slice (last five minutes): " + histogram.getMean());
 +                    System.out.println("\t\tMaximum tombstones per slice (last five minutes): " + histogram.getMax());
 +
 +                    System.out.println("");
 +                }
 +                System.out.println("----------------");
 +            }
 +        }
 +
 +        private String format(long bytes, boolean humanReadable) {
 +            return humanReadable ? FileUtils.stringifyFileSize(bytes) : Long.toString(bytes);
 +        }
 +
 +        /**
 +         * Used for filtering keyspaces and columnfamilies to be displayed using the cfstats command.
 +         */
 +        private static class OptionFilter
 +        {
 +            private Map<String, List<String>> filter = new HashMap<>();
 +            private Map<String, List<String>> verifier = new HashMap<>();
 +            private List<String> filterList = new ArrayList<>();
 +            private boolean ignoreMode;
 +
 +            public OptionFilter(boolean ignoreMode, List<String> filterList)
 +            {
 +                this.filterList.addAll(filterList);
 +                this.ignoreMode = ignoreMode;
 +
 +                for (String s : filterList)
 +                {
 +                    String[] keyValues = s.split("\\.", 2);
 +
 +                    // build the map that stores the ks' and cfs to use
 +                    if (!filter.containsKey(keyValues[0]))
 +                    {
 +                        filter.put(keyValues[0], new ArrayList<String>());
 +                        verifier.put(keyValues[0], new ArrayList<String>());
 +
 +                        if (keyValues.length == 2)
 +                        {
 +                            filter.get(keyValues[0]).add(keyValues[1]);
 +                            verifier.get(keyValues[0]).add(keyValues[1]);
 +                        }
 +                    } else
 +                    {
 +                        if (keyValues.length == 2)
 +                        {
 +                            filter.get(keyValues[0]).add(keyValues[1]);
 +                            verifier.get(keyValues[0]).add(keyValues[1]);
 +                        }
 +                    }
 +                }
 +            }
 +
 +            public boolean isColumnFamilyIncluded(String keyspace, String columnFamily)
 +            {
 +                // supplying empty params list is treated as wanting to display all kss & cfs
 +                if (filterList.isEmpty())
 +                    return !ignoreMode;
 +
 +                List<String> cfs = filter.get(keyspace);
 +
 +                // no such keyspace is in the map
 +                if (cfs == null)
 +                    return ignoreMode;
 +                    // only a keyspace with no cfs was supplied
 +                    // so ignore or include (based on the flag) every column family in specified keyspace
 +                else if (cfs.size() == 0)
 +                    return !ignoreMode;
 +
 +                // keyspace exists, and it contains specific cfs
 +                verifier.get(keyspace).remove(columnFamily);
 +                return ignoreMode ^ cfs.contains(columnFamily);
 +            }
 +
 +            public void verifyKeyspaces(List<String> keyspaces)
 +            {
 +                for (String ks : verifier.keySet())
 +                    if (!keyspaces.contains(ks))
 +                        throw new IllegalArgumentException("Unknown keyspace: " + ks);
 +            }
 +
 +            public void verifyColumnFamilies()
 +            {
 +                for (String ks : filter.keySet())
 +                    if (verifier.get(ks).size() > 0)
 +                        throw new IllegalArgumentException("Unknown column families: " + verifier.get(ks).toString() + " in keyspace: " + ks);
 +            }
 +        }
 +    }
 +
 +    @Command(name = "cfhistograms", description = "Print statistic histograms for a given column family")
 +    public static class CfHistograms extends NodeToolCmd
 +    {
 +        @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace and column family name")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            checkArgument(args.size() == 2, "cfhistograms requires ks and cf args");
 +
 +            String keyspace = args.get(0);
 +            String cfname = args.get(1);
 +
 +            // calculate percentile of row size and column count
 +            long[] estimatedRowSize = (long[]) probe.getColumnFamilyMetric(keyspace, cfname, "EstimatedRowSizeHistogram");
 +            long[] estimatedColumnCount = (long[]) probe.getColumnFamilyMetric(keyspace, cfname, "EstimatedColumnCountHistogram");
 +
 +            long[] rowSizeBucketOffsets = new EstimatedHistogram(estimatedRowSize.length).getBucketOffsets();
 +            long[] columnCountBucketOffsets = new EstimatedHistogram(estimatedColumnCount.length).getBucketOffsets();
 +            EstimatedHistogram rowSizeHist = new EstimatedHistogram(rowSizeBucketOffsets, estimatedRowSize);
 +            EstimatedHistogram columnCountHist = new EstimatedHistogram(columnCountBucketOffsets, estimatedColumnCount);
 +
 +            // build arrays to store percentile values
 +            double[] estimatedRowSizePercentiles = new double[7];
 +            double[] estimatedColumnCountPercentiles = new double[7];
 +            double[] offsetPercentiles = new double[]{0.5, 0.75, 0.95, 0.98, 0.99};
 +
 +            if (rowSizeHist.isOverflowed())
 +            {
 +                System.err.println(String.format("Row sizes are larger than %s, unable to calculate percentiles", rowSizeBucketOffsets[rowSizeBucketOffsets.length - 1]));
 +                for (int i = 0; i < offsetPercentiles.length; i++)
 +                        estimatedRowSizePercentiles[i] = Double.NaN;
 +            }
 +            else
 +            {
 +                for (int i = 0; i < offsetPercentiles.length; i++)
 +                    estimatedRowSizePercentiles[i] = rowSizeHist.percentile(offsetPercentiles[i]);
 +            }
 +
 +            if (columnCountHist.isOverflowed())
 +            {
 +                System.err.println(String.format("Column counts are larger than %s, unable to calculate percentiles", columnCountBucketOffsets[columnCountBucketOffsets.length - 1]));
 +                for (int i = 0; i < estimatedColumnCountPercentiles.length; i++)
 +                    estimatedColumnCountPercentiles[i] = Double.NaN;
 +            }
 +            else
 +            {
 +                for (int i = 0; i < offsetPercentiles.length; i++)
 +                    estimatedColumnCountPercentiles[i] = columnCountHist.percentile(offsetPercentiles[i]);
 +            }
 +
 +            // min value
 +            estimatedRowSizePercentiles[5] = rowSizeHist.min();
 +            estimatedColumnCountPercentiles[5] = columnCountHist.min();
 +            // max value
 +            estimatedRowSizePercentiles[6] = rowSizeHist.max();
 +            estimatedColumnCountPercentiles[6] = columnCountHist.max();
 +
 +            String[] percentiles = new String[]{"50%", "75%", "95%", "98%", "99%", "Min", "Max"};
 +            double[] readLatency = probe.metricPercentilesAsArray((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspace, cfname, "ReadLatency"));
 +            double[] writeLatency = probe.metricPercentilesAsArray((JmxReporter.TimerMBean) probe.getColumnFamilyMetric(keyspace, cfname, "WriteLatency"));
 +            double[] sstablesPerRead = probe.metricPercentilesAsArray((JmxReporter.HistogramMBean) probe.getColumnFamilyMetric(keyspace, cfname, "SSTablesPerReadHistogram"));
 +
 +            System.out.println(format("%s/%s histograms", keyspace, cfname));
 +            System.out.println(format("%-10s%10s%18s%18s%18s%18s",
 +                    "Percentile", "SSTables", "Write Latency", "Read Latency", "Partition Size", "Cell Count"));
 +            System.out.println(format("%-10s%10s%18s%18s%18s%18s",
 +                    "", "", "(micros)", "(micros)", "(bytes)", ""));
 +
 +            for (int i = 0; i < percentiles.length; i++)
 +            {
 +                System.out.println(format("%-10s%10.2f%18.2f%18.2f%18.0f%18.0f",
 +                        percentiles[i],
 +                        sstablesPerRead[i],
 +                        writeLatency[i],
 +                        readLatency[i],
 +                        estimatedRowSizePercentiles[i],
 +                        estimatedColumnCountPercentiles[i]));
 +            }
 +            System.out.println();
 +        }
 +    }
 +
 +    @Command(name = "cleanup", description = "Triggers the immediate cleanup of keys no longer belonging to a node. By default, clean all keyspaces")
 +    public static class Cleanup extends NodeToolCmd
 +    {
 +        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +            String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +            for (String keyspace : keyspaces)
 +            {
 +                if (Keyspace.SYSTEM_KS.equals(keyspace))
 +                    continue;
 +
 +                try
 +                {
 +                    probe.forceKeyspaceCleanup(System.out, keyspace, cfnames);
 +                } catch (Exception e)
 +                {
 +                    throw new RuntimeException("Error occurred during cleanup", e);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Command(name = "clearsnapshot", description = "Remove the snapshot with the given name from the given keyspaces. If no snapshotName is specified we will remove all snapshots")
 +    public static class ClearSnapshot extends NodeToolCmd
 +    {
 +        @Arguments(usage = "[<keyspaces>...] ", description = "Remove snapshots from the given keyspaces")
 +        private List<String> keyspaces = new ArrayList<>();
 +
 +        @Option(title = "snapshot_name", name = "-t", description = "Remove the snapshot with a given name")
 +        private String snapshotName = EMPTY;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            StringBuilder sb = new StringBuilder();
 +
 +            sb.append("Requested clearing snapshot(s) for ");
 +
 +            if (keyspaces.isEmpty())
 +                sb.append("[all keyspaces]");
 +            else
 +                sb.append("[").append(join(keyspaces, ", ")).append("]");
 +
 +            if (!snapshotName.isEmpty())
 +                sb.append(" with snapshot name [").append(snapshotName).append("]");
 +
 +            System.out.println(sb.toString());
 +
 +            try
 +            {
 +                probe.clearSnapshot(snapshotName, toArray(keyspaces, String.class));
 +            } catch (IOException e)
 +            {
 +                throw new RuntimeException("Error during clearing snapshots", e);
 +            }
 +        }
 +    }
 +
 +    @Command(name = "compact", description = "Force a (major) compaction on one or more column families")
 +    public static class Compact extends NodeToolCmd
 +    {
 +        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +            String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +            for (String keyspace : keyspaces)
 +            {
 +                try
 +                {
 +                    probe.forceKeyspaceCompaction(keyspace, cfnames);
 +                } catch (Exception e)
 +                {
 +                    throw new RuntimeException("Error occurred during compaction", e);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Command(name = "flush", description = "Flush one or more column families")
 +    public static class Flush extends NodeToolCmd
 +    {
 +        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +            String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +            for (String keyspace : keyspaces)
 +            {
 +                try
 +                {
 +                    probe.forceKeyspaceFlush(keyspace, cfnames);
 +                } catch (Exception e)
 +                {
 +                    throw new RuntimeException("Error occurred during flushing", e);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Command(name = "scrub", description = "Scrub (rebuild sstables for) one or more column families")
 +    public static class Scrub extends NodeToolCmd
 +    {
 +        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Option(title = "disable_snapshot",
 +                name = {"-ns", "--no-snapshot"},
 +                description = "Scrubbed CFs will be snapshotted first, if disableSnapshot is false. (default false)")
 +        private boolean disableSnapshot = false;
 +
 +        @Option(title = "skip_corrupted",
 +                name = {"-s", "--skip-corrupted"},
 +                description = "Skip corrupted partitions even when scrubbing counter tables. (default false)")
 +        private boolean skipCorrupted = false;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +            String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +            for (String keyspace : keyspaces)
 +            {
 +                try
 +                {
 +                    probe.scrub(System.out, disableSnapshot, skipCorrupted, keyspace, cfnames);
 +                } catch (Exception e)
 +                {
 +                    throw new RuntimeException("Error occurred during flushing", e);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Command(name = "disableautocompaction", description = "Disable autocompaction for the given keyspace and column family")
 +    public static class DisableAutoCompaction extends NodeToolCmd
 +    {
 +        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +            String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +            for (String keyspace : keyspaces)
 +            {
 +                try
 +                {
 +                    probe.disableAutoCompaction(keyspace, cfnames);
 +                } catch (IOException e)
 +                {
 +                    throw new RuntimeException("Error occurred during disabling auto-compaction", e);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Command(name = "enableautocompaction", description = "Enable autocompaction for the given keyspace and column family")
 +    public static class EnableAutoCompaction extends NodeToolCmd
 +    {
 +        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +            String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +            for (String keyspace : keyspaces)
 +            {
 +                try
 +                {
 +                    probe.enableAutoCompaction(keyspace, cfnames);
 +                } catch (IOException e)
 +                {
 +                    throw new RuntimeException("Error occurred during enabling auto-compaction", e);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Command(name = "upgradesstables", description = "Rewrite sstables (for the requested column families) that are not on the current version (thus upgrading them to said current version)")
 +    public static class UpgradeSSTable extends NodeToolCmd
 +    {
 +        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description = "Use -a to include all sstables, even those already on the current version")
 +        private boolean includeAll = false;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +            String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +            for (String keyspace : keyspaces)
 +            {
 +                try
 +                {
 +                    probe.upgradeSSTables(System.out, keyspace, !includeAll, cfnames);
 +                } catch (Exception e)
 +                {
 +                    throw new RuntimeException("Error occurred during enabling auto-compaction", e);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Command(name = "compactionstats", description = "Print statistics on compactions")
 +    public static class CompactionStats extends NodeToolCmd
 +    {
 +        @Option(title = "human_readable",
 +                name = {"-H", "--human-readable"},
 +                description = "Display bytes in human readable form, i.e. KB, MB, GB, TB")
 +        private boolean humanReadable = false;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            int compactionThroughput = probe.getCompactionThroughput();
 +            CompactionManagerMBean cm = probe.getCompactionManagerProxy();
 +            System.out.println("pending tasks: " + probe.getCompactionMetric("PendingTasks"));
 +            long remainingBytes = 0;
 +            List<Map<String, String>> compactions = cm.getCompactions();
 +            if (!compactions.isEmpty())
 +            {
 +                List<String[]> lines = new ArrayList<>();
 +                int[] columnSizes = new int[] { 0, 0, 0, 0, 0, 0, 0 };
 +
 +                addLine(lines, columnSizes, "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
 +                for (Map<String, String> c : compactions)
 +                {
 +                    long total = Long.parseLong(c.get("total"));
 +                    long completed = Long.parseLong(c.get("completed"));
 +                    String taskType = c.get("taskType");
 +                    String keyspace = c.get("keyspace");
 +                    String columnFamily = c.get("columnfamily");
 +                    String completedStr = humanReadable ? FileUtils.stringifyFileSize(completed) : Long.toString(completed);
 +                    String totalStr = humanReadable ? FileUtils.stringifyFileSize(total) : Long.toString(total);
 +                    String unit = c.get("unit");
 +                    String percentComplete = total == 0 ? "n/a" : new DecimalFormat("0.00").format((double) completed / total * 100) + "%";
 +                    addLine(lines, columnSizes, taskType, keyspace, columnFamily, completedStr, totalStr, unit, percentComplete);
 +                    if (taskType.equals(OperationType.COMPACTION.toString()))
 +                        remainingBytes += total - completed;
 +                }
 +
 +                StringBuilder buffer = new StringBuilder();
 +                for (int columnSize : columnSizes) {
 +                    buffer.append("%");
 +                    buffer.append(columnSize + 3);
 +                    buffer.append("s");
 +                }
 +                buffer.append("%n");
 +                String format = buffer.toString();
 +
 +                for (String[] line : lines)
 +                {
 +                    System.out.printf(format, line[0], line[1], line[2], line[3], line[4], line[5], line[6]);
 +                }
 +
 +                String remainingTime = "n/a";
 +                if (compactionThroughput != 0)
 +                {
 +                    long remainingTimeInSecs = remainingBytes / (1024L * 1024L * compactionThroughput);
 +                    remainingTime = format("%dh%02dm%02ds", remainingTimeInSecs / 3600, (remainingTimeInSecs % 3600) / 60, (remainingTimeInSecs % 60));
 +                }
 +                System.out.printf("%25s%10s%n", "Active compaction remaining time : ", remainingTime);
 +            }
 +        }
 +
 +        private void addLine(List<String[]> lines, int[] columnSizes, String... columns) {
 +            lines.add(columns);
 +            for (int i = 0; i < columns.length; i++) {
 +                columnSizes[i] = Math.max(columnSizes[i], columns[i].length());
 +            }
 +        }
 +    }
 +
 +    @Command(name = "compactionhistory", description = "Print history of compaction")
 +    public static class CompactionHistory extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            System.out.println("Compaction History: ");
 +
 +            TabularData tabularData = probe.getCompactionHistory();
 +            if (tabularData.isEmpty())
 +            {
 +                System.out.printf("There is no compaction history");
 +                return;
 +            }
 +
 +            String format = "%-41s%-19s%-29s%-26s%-15s%-15s%s%n";
 +            List<String> indexNames = tabularData.getTabularType().getIndexNames();
 +            System.out.printf(format, toArray(indexNames, Object.class));
 +
 +            Set<?> values = tabularData.keySet();
 +            for (Object eachValue : values)
 +            {
 +                List<?> value = (List<?>) eachValue;
 +                System.out.printf(format, toArray(value, Object.class));
 +            }
 +        }
 +    }
 +
 +    @Command(name = "decommission", description = "Decommission the *node I am connecting to*")
 +    public static class Decommission extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            try
 +            {
 +                probe.decommission();
 +            } catch (InterruptedException e)
 +            {
 +                throw new RuntimeException("Error decommissioning node", e);
 +            }
 +        }
 +    }
 +
 +    @Command(name = "describecluster", description = "Print the name, snitch, partitioner and schema version of a cluster")
 +    public static class DescribeCluster extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            // display cluster name, snitch and partitioner
 +            System.out.println("Cluster Information:");
 +            System.out.println("\tName: " + probe.getClusterName());
 +            System.out.println("\tSnitch: " + probe.getEndpointSnitchInfoProxy().getSnitchName());
 +            System.out.println("\tPartitioner: " + probe.getPartitioner());
 +
 +            // display schema version for each node
 +            System.out.println("\tSchema versions:");
 +            Map<String, List<String>> schemaVersions = probe.getSpProxy().getSchemaVersions();
 +            for (String version : schemaVersions.keySet())
 +            {
 +                System.out.println(format("\t\t%s: %s%n", version, schemaVersions.get(version)));
 +            }
 +        }
 +    }
 +
 +    @Command(name = "disablebinary", description = "Disable native transport (binary protocol)")
 +    public static class DisableBinary extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.stopNativeTransport();
 +        }
 +    }
 +
 +    @Command(name = "enablebinary", description = "Reenable native transport (binary protocol)")
 +    public static class EnableBinary extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.startNativeTransport();
 +        }
 +    }
 +
 +    @Command(name = "enablegossip", description = "Reenable gossip")
 +    public static class EnableGossip extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.startGossiping();
 +        }
 +    }
 +
 +    @Command(name = "disablegossip", description = "Disable gossip (effectively marking the node down)")
 +    public static class DisableGossip extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.stopGossiping();
 +        }
 +    }
 +
 +    @Command(name = "enablehandoff", description = "Reenable the future hints storing on the current node")
 +    public static class EnableHandoff extends NodeToolCmd
 +    {
 +        @Arguments(usage = "<dc-name>,<dc-name>", description = "Enable hinted handoff only for these DCs")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            checkArgument(args.size() <= 1, "enablehandoff does not accept two args");
 +            if(args.size() == 1)
 +                probe.enableHintedHandoff(args.get(0));
 +            else
 +                probe.enableHintedHandoff();
 +        }
 +    }
 +
 +    @Command(name = "enablethrift", description = "Reenable thrift server")
 +    public static class EnableThrift extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.startThriftServer();
 +        }
 +    }
 +
 +    @Command(name = "getcompactionthreshold", description = "Print min and max compaction thresholds for a given column family")
 +    public static class GetCompactionThreshold extends NodeToolCmd
 +    {
 +        @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace with a column family")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            checkArgument(args.size() == 2, "getcompactionthreshold requires ks and cf args");
 +            String ks = args.get(0);
 +            String cf = args.get(1);
 +
 +            ColumnFamilyStoreMBean cfsProxy = probe.getCfsProxy(ks, cf);
 +            System.out.println("Current compaction thresholds for " + ks + "/" + cf + ": \n" +
 +                    " min = " + cfsProxy.getMinimumCompactionThreshold() + ", " +
 +                    " max = " + cfsProxy.getMaximumCompactionThreshold());
 +        }
 +    }
 +
 +    @Command(name = "getcompactionthroughput", description = "Print the MB/s throughput cap for compaction in the system")
 +    public static class GetCompactionThroughput extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            System.out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s");
 +        }
 +    }
 +
 +    @Command(name = "getstreamthroughput", description = "Print the Mb/s throughput cap for streaming in the system")
 +    public static class GetStreamThroughput extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            System.out.println("Current stream throughput: " + probe.getStreamThroughput() + " Mb/s");
 +        }
 +    }
 +
 +    @Command(name = "getendpoints", description = "Print the end points that owns the key")
 +    public static class GetEndpoints extends NodeToolCmd
 +    {
 +        @Arguments(usage = "<keyspace> <cfname> <key>", description = "The keyspace, the column family, and the key for which we need to find the endpoint")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            checkArgument(args.size() == 3, "getendpoints requires ks, cf and key args");
 +            String ks = args.get(0);
 +            String cf = args.get(1);
 +            String key = args.get(2);
 +
 +            List<InetAddress> endpoints = probe.getEndpoints(ks, cf, key);
 +            for (InetAddress endpoint : endpoints)
 +            {
 +                System.out.println(endpoint.getHostAddress());
 +            }
 +        }
 +    }
 +
 +    @Command(name = "getsstables", description = "Print the sstable filenames that own the key")
 +    public static class GetSSTables extends NodeToolCmd
 +    {
 +        @Arguments(usage = "<keyspace> <cfname> <key>", description = "The keyspace, the column family, and the key")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            checkArgument(args.size() == 3, "getsstables requires ks, cf and key args");
 +            String ks = args.get(0);
 +            String cf = args.get(1);
 +            String key = args.get(2);
 +
 +            List<String> sstables = probe.getSSTables(ks, cf, key);
 +            for (String sstable : sstables)
 +            {
 +                System.out.println(sstable);
 +            }
 +        }
 +    }
 +
 +    @Command(name = "gossipinfo", description = "Shows the gossip information for the cluster")
 +    public static class GossipInfo extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            System.out.println(probe.getGossipInfo());
 +        }
 +    }
 +
 +    @Command(name = "invalidatekeycache", description = "Invalidate the key cache")
 +    public static class InvalidateKeyCache extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.invalidateKeyCache();
 +        }
 +    }
 +
 +    @Command(name = "invalidaterowcache", description = "Invalidate the row cache")
 +    public static class InvalidateRowCache extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.invalidateRowCache();
 +        }
 +    }
 +
 +    @Command(name = "invalidatecountercache", description = "Invalidate the counter cache")
 +    public static class InvalidateCounterCache extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.invalidateCounterCache();
 +        }
 +    }
 +
 +    @Command(name = "join", description = "Join the ring")
 +    public static class Join extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            checkState(!probe.isJoined(), "This node has already joined the ring.");
 +
 +            try
 +            {
 +                probe.joinRing();
 +            } catch (IOException e)
 +            {
 +                throw new RuntimeException("Error during joining the ring", e);
 +            }
 +        }
 +    }
 +
 +    @Command(name = "move", description = "Move node on the token ring to a new token")
 +    public static class Move extends NodeToolCmd
 +    {
 +        @Arguments(usage = "<new token>", description = "The new token.", required = true)
 +        private String newToken = EMPTY;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            try
 +            {
 +                probe.move(newToken);
 +            } catch (IOException e)
 +            {
 +                throw new RuntimeException("Error during moving node", e);
 +            }
 +        }
 +    }
 +
 +
 +
 +    @Command(name = "pausehandoff", description = "Pause hints delivery process")
 +    public static class PauseHandoff extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.pauseHintsDelivery();
 +        }
 +    }
 +
 +    @Command(name = "resumehandoff", description = "Resume hints delivery process")
 +    public static class ResumeHandoff extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.resumeHintsDelivery();
 +        }
 +    }
 +
 +
 +    @Command(name = "proxyhistograms", description = "Print statistic histograms for network operations")
 +    public static class ProxyHistograms extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            String[] percentiles = new String[]{"50%", "75%", "95%", "98%", "99%", "Min", "Max"};
 +            double[] readLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("Read"));
 +            double[] writeLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("Write"));
 +            double[] rangeLatency = probe.metricPercentilesAsArray(probe.getProxyMetric("RangeSlice"));
 +
 +            System.out.println("proxy histograms");
 +            System.out.println(format("%-10s%18s%18s%18s",
 +                    "Percentile", "Read Latency", "Write Latency", "Range Latency"));
 +            System.out.println(format("%-10s%18s%18s%18s",
 +                    "", "(micros)", "(micros)", "(micros)"));
 +            for (int i = 0; i < percentiles.length; i++)
 +            {
 +                System.out.println(format("%-10s%18.2f%18.2f%18.2f",
 +                        percentiles[i],
 +                        readLatency[i],
 +                        writeLatency[i],
 +                        rangeLatency[i]));
 +            }
 +            System.out.println();
 +        }
 +    }
 +
 +    @Command(name = "rebuild", description = "Rebuild data by streaming from other nodes (similarly to bootstrap)")
 +    public static class Rebuild extends NodeToolCmd
 +    {
 +        @Arguments(usage = "<src-dc-name>", description = "Name of DC from which to select sources for streaming. By default, pick any DC")
 +        private String sourceDataCenterName = null;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.rebuild(sourceDataCenterName);
 +        }
 +    }
 +
 +    @Command(name = "refresh", description = "Load newly placed SSTables to the system without restart")
 +    public static class Refresh extends NodeToolCmd
 +    {
 +        @Arguments(usage = "<keyspace> <cfname>", description = "The keyspace and column family name")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            checkArgument(args.size() == 2, "refresh requires ks and cf args");
 +            probe.loadNewSSTables(args.get(0), args.get(1));
 +        }
 +    }
 +
 +    @Deprecated
 +    @Command(name = "removetoken", description = "DEPRECATED (see removenode)", hidden = true)
 +    public static class RemoveToken extends NodeToolCmd
 +    {
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            System.err.println("Warn: removetoken is deprecated, please use removenode instead");
 +        }
 +    }
 +
 +    @Command(name = "removenode", description = "Show status of current node removal, force completion of pending removal or remove provided ID")
 +    public static class RemoveNode extends NodeToolCmd
 +    {
 +        @Arguments(title = "remove_operation", usage = "<status>|<force>|<ID>", description = "Show status of current node removal, force completion of pending removal, or remove provided ID", required = true)
 +        private String removeOperation = EMPTY;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            switch (removeOperation)
 +            {
 +                case "status":
 +                    System.out.println("RemovalStatus: " + probe.getRemovalStatus());
 +                    break;
 +                case "force":
 +                    System.out.println("RemovalStatus: " + probe.getRemovalStatus());
 +                    probe.forceRemoveCompletion();
 +                    break;
 +                default:
 +                    probe.removeNode(removeOperation);
 +                    break;
 +            }
 +        }
 +    }
 +
 +    @Command(name = "repair", description = "Repair one or more column families")
 +    public static class Repair extends NodeToolCmd
 +    {
 +        @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace followed by one or many column families")
 +        private List<String> args = new ArrayList<>();
 +
 +        @Option(title = "parallel", name = {"-par", "--parallel"}, description = "Use -par to carry out a parallel repair")
 +        private boolean parallel = false;
 +
 +        @Option(title = "dc parallel", name = {"-dcpar", "--dc-parallel"}, description = "Use -dcpar to repair data centers in parallel.")
 +        private boolean dcParallel = false;
 +
 +        @Option(title = "local_dc", name = {"-local", "--in-local-dc"}, description = "Use -local to only repair against nodes in the same datacenter")
 +        private boolean localDC = false;
 +
 +        @Option(title = "specific_dc", name = {"-dc", "--in-dc"}, description = "Use -dc to repair specific datacenters")
 +        private List<String> specificDataCenters = new ArrayList<>();
 +
 +        @Option(title = "specific_host", name = {"-hosts", "--in-hosts"}, description = "Use -hosts to repair specific hosts")
 +        private List<String> specificHosts = new ArrayList<>();
 +
 +        @Option(title = "start_token", name = {"-st", "--start-token"}, description = "Use -st to specify a token at which the repair range starts")
 +        private String startToken = EMPTY;
 +
 +        @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which repair range ends")
 +        private String endToken = EMPTY;
 +
 +        @Option(title = "primary_range", name = {"-pr", "--partitioner-range"}, description = "Use -pr to repair only the first range returned by the partitioner")
 +        private boolean primaryRange = false;
 +
 +        @Option(title = "incremental_repair", name = {"-inc", "--incremental"}, description = "Use -inc to use the new incremental repair")
 +        private boolean incrementalRepair = false;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +            String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +            if (primaryRange && (!specificDataCenters.isEmpty() || !specificHosts.isEmpty()))
 +                throw new RuntimeException("Primary range repair should be performed on all nodes in the cluster.");
 +
 +            for (String keyspace : keyspaces)
 +            {
 +                try
 +                {
 +                    RepairParallelism parallelismDegree = RepairParallelism.SEQUENTIAL;
 +                    if (parallel)
 +                        parallelismDegree = RepairParallelism.PARALLEL;
 +                    else if (dcParallel)
 +                        parallelismDegree = RepairParallelism.DATACENTER_AWARE;
 +
 +                    Collection<String> dataCenters = null;
 +                    Collection<String> hosts = null;
 +                    if (!specificDataCenters.isEmpty())
 +                        dataCenters = newArrayList(specificDataCenters);
 +                    else if (localDC)
 +                        dataCenters = newArrayList(probe.getDataCenter());
 +                    else if(!specificHosts.isEmpty())
 +                        hosts = newArrayList(specificHosts);
 +                    if (!startToken.isEmpty() || !endToken.isEmpty())
 +                        probe.forceRepairRangeAsync(System.out, keyspace, parallelismDegree, dataCenters,hosts, startToken, endToken, !incrementalRepair);
 +                    else
 +                        probe.forceRepairAsync(System.out, keyspace, parallelismDegree, dataCenters, hosts, primaryRange, !incrementalRepair, cfnames);
 +                } catch (Exception e)
 +                {
 +                    throw new RuntimeException("Error occurred during repair", e);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Command(name = "setcachecapacity", description = "Set global key, row, and counter cache capacities (in MB units)")
 +    public static class SetCacheCapacity extends NodeToolCmd
 +    {
 +        @Arguments(title = "<key-cache-capacity> <row-cache-capacity> <counter-cache-capacity>",
 +                   usage = "<key-cache-capacity> <row-cache-capacity> <counter-cache-capacity>",
 +                   description = "Key cache, row cache, and counter cache (in MB)",
 +                   required = true)
 +        private List<Integer> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            checkArgument(args.size() == 3, "setcachecapacity requires key-cache-capacity, row-cache-capacity, and counter-cache-capacity args.");
 +            probe.setCacheCapacities(args.get(0), args.get(1), args.get(2));
 +        }
 +    }
 +
 +    @Command(name = "setcompactionthreshold", description = "Set min and max compaction thresholds for a given column family")
 +    public static class SetCompactionThreshold extends NodeToolCmd
 +    {
 +        @Arguments(title = "<keyspace> <cfname> <minthreshold> <maxthreshold>", usage = "<keyspace> <cfname> <minthreshold> <maxthreshold>", description = "The keyspace, the column family, min and max threshold", required = true)
 +        private List<String> args = new ArrayList<>();
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            checkArgument(args.size() == 4, "setcompactionthreshold requires ks, cf, min, and max threshold args.");
 +
 +            int minthreshold = parseInt(args.get(2));
 +            int maxthreshold = parseInt(args.get(3));
 +            checkArgument(minthreshold >= 0 && maxthreshold >= 0, "Thresholds must be positive integers");
 +            checkArgument(minthreshold <= maxthreshold, "Min threshold cannot be greater than max.");
 +            checkArgument(minthreshold >= 2 || maxthreshold == 0, "Min threshold must be at least 2");
 +
 +            probe.setCompactionThreshold(args.get(0), args.get(1), minthreshold, maxthreshold);
 +        }
 +    }
 +
 +    @Command(name = "setcompactionthroughput", description = "Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling")
 +    public static class SetCompactionThroughput extends NodeToolCmd
 +    {
 +        @Arguments(title = "compaction_throughput", usage = "<value_in_mb>", description = "Value in MB, 0 to disable throttling", required = true)
 +        private Integer compactionThroughput = null;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.setCompactionThroughput(compactionThroughput);
 +        }
 +    }
 +
 +    @Command(name = "sethintedhandoffthrottlekb", description =  "Set hinted handoff throttle in kb per second, per delivery thread.")
 +    public static class SetHintedHandoffThrottleInKB extends NodeToolCmd
 +    {
 +        @Arguments(title = "throttle_in_kb", usage = "<value_in_kb_per_sec>", description = "Value in KB per second", required = true)
 +        private Integer throttleInKB = null;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.setHintedHandoffThrottleInKB(throttleInKB);
 +        }
 +    }
 +
 +    @Command(name = "setstreamthroughput", description = "Set the Mb/s throughput cap for streaming in the system, or 0 to disable throttling")
 +    public static class SetStreamThroughput extends NodeToolCmd
 +    {
 +        @Arguments(title = "stream_throughput", usage = "<value_in_mb>", description = "Value in Mb, 0 to disable throttling", required = true)
 +        private Integer streamThroughput = null;
 +
 +        @Override
 +        public void execute(NodeProbe probe)
 +        {
 +            probe.setStreamThroughput(streamThroughput);
 +        }
 +    }
 +
 +    @Command(name = "settraceprobability", description = "Sets the probability for tracing any given request to value. 0 disables, 1 enables for all requests, 0 is the default")
 +    public static class SetTraceProbability extends NodeToolCmd
 +    {
 +        @Arguments(title = "trace_probability", usage = "<value>", description = "Trace prob

<TRUNCATED>

[2/5] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index 83d8f3a,0886edc..cc162d4
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@@ -30,7 -31,13 +30,13 @@@ public class AlwaysPresentFilter implem
  
      public void clear() { }
  
 -    public void close() throws IOException { }
 +    public void close() { }
  
      public long serializedSize() { return 0; }
+ 
+     @Override
+     public long offHeapSize()
+     {
+         return 0;
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/utils/IFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/IFilter.java
index 91c0e36,aed5f39..60c0590
--- a/src/java/org/apache/cassandra/utils/IFilter.java
+++ b/src/java/org/apache/cassandra/utils/IFilter.java
@@@ -30,5 -30,9 +30,11 @@@ public interface IFilter extends Closea
  
      long serializedSize();
  
 +    void close();
++
+     /**
+      * Returns the amount of memory in bytes used off heap.
+      * @return the amount of memory in bytes used off heap
+      */
+     long offHeapSize();
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/utils/obs/IBitSet.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/obs/IBitSet.java
index 96aac6b,47ba492..ed7e54b
--- a/src/java/org/apache/cassandra/utils/obs/IBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/IBitSet.java
@@@ -50,5 -50,9 +50,11 @@@ public interface IBitSet extends Closea
  
      public void clear();
  
 +    public void close();
++
+     /**
+      * Returns the amount of memory in bytes used off heap.
+      * @return the amount of memory in bytes used off heap
+      */
+     public long offHeapSize();
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/utils/obs/OpenBitSet.java
----------------------------------------------------------------------


[4/5] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/io/sstable/SSTableReader.java
	src/java/org/apache/cassandra/tools/NodeCmd.java
	src/java/org/apache/cassandra/utils/IFilter.java
	src/java/org/apache/cassandra/utils/obs/IBitSet.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9aaea248
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9aaea248
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9aaea248

Branch: refs/heads/trunk
Commit: 9aaea2485f9e79a4e9bd3fcf80070a3c7aa1be46
Parents: b106292 7a14a77
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Nov 28 03:12:05 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 28 03:12:05 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 15 ++++
 .../cassandra/db/ColumnFamilyStoreMBean.java    | 18 +++++
 .../io/compress/CompressionMetadata.java        |  9 +++
 .../cassandra/io/sstable/SSTableReader.java     | 21 ++++++
 .../cassandra/metrics/ColumnFamilyMetrics.java  | 36 ++++++++++
 .../cassandra/metrics/KeyspaceMetrics.java      | 27 +++++++
 .../org/apache/cassandra/tools/NodeProbe.java   |  4 ++
 .../org/apache/cassandra/tools/NodeTool.java    | 74 +++++++++++++++-----
 .../cassandra/utils/AlwaysPresentFilter.java    |  6 ++
 .../org/apache/cassandra/utils/IFilter.java     |  6 ++
 .../cassandra/utils/Murmur3BloomFilter.java     |  6 ++
 .../org/apache/cassandra/utils/obs/IBitSet.java |  6 ++
 .../cassandra/utils/obs/OffHeapBitSet.java      |  6 ++
 .../apache/cassandra/utils/obs/OpenBitSet.java  |  6 ++
 15 files changed, 224 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e5f7c28,8f4add9..2f11996
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
 -2.0.12:
 +2.1.3
 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 + * Log failed host when preparing incremental repair (CASSANDRA-8228)
 +Merged from 2.0:
+  * Expose off-heap memory usage stats (CASSANDRA-7897)
   * Ignore Paxos commits for truncated tables (CASSANDRA-7538)
   * Validate size of indexed column values (CASSANDRA-8280)
   * Make LCS split compaction results over all data directories (CASSANDRA-8329)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 1fe4330,f0e9e65..0a34b4a
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@@ -1186,7 -664,25 +1198,16 @@@ public class SSTableReader extends SSTa
      }
  
      /**
+      * Returns the amount of memory in bytes used off heap by the bloom filter.
+      * @return the amount of memory in bytes used off heap by the bloom filter
+      */
+     public long getBloomFilterOffHeapSize()
+     {
+         return bf.offHeapSize();
+     }
+ 
+     /**
 -     * Returns the amount of memory in bytes used off heap by the index summary.
 -     * @return the amount of memory in bytes used off heap by the index summary
 -     */
 -    public long getIndexSummaryOffHeapSize()
 -    {
 -        return indexSummary.offHeapSize();
 -    }
 -
 -    /**
 -     * @return An estimate of the number of keys in this SSTable.
 +     * @return An estimate of the number of keys in this SSTable based on the index summary.
       */
      public long estimatedKeys()
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aaea248/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index 1d05887,261d416..38d0f74
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -970,179 -922,6 +970,183 @@@ public class NodeProbe implements AutoC
          return spProxy.getReadRepairRepairedBackground();
      }
  
 +    // JMX getters for the o.a.c.metrics API below.
 +    /**
 +     * Retrieve cache metrics based on the cache type (KeyCache, RowCache, or CounterCache)
 +     * @param cacheType KeyCach, RowCache, or CounterCache
 +     * @param metricName Capacity, Entries, HitRate, Size, Requests or Hits.
 +     */
 +    public Object getCacheMetric(String cacheType, String metricName)
 +    {
 +        try
 +        {
 +            switch(metricName)
 +            {
 +                case "Capacity":
 +                case "Entries":
 +                case "HitRate":
 +                case "Size":
 +                    return JMX.newMBeanProxy(mbeanServerConn,
 +                            new ObjectName("org.apache.cassandra.metrics:type=Cache,scope=" + cacheType + ",name=" + metricName),
 +                            JmxReporter.GaugeMBean.class).getValue();
 +                case "Requests":
 +                case "Hits":
 +                    return JMX.newMBeanProxy(mbeanServerConn,
 +                            new ObjectName("org.apache.cassandra.metrics:type=Cache,scope=" + cacheType + ",name=" + metricName),
 +                            JmxReporter.MeterMBean.class).getCount();
 +                default:
 +                    throw new RuntimeException("Unknown cache metric name.");
 +
 +            }
 +        }
 +        catch (MalformedObjectNameException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    /**
 +     * Retrieve ColumnFamily metrics
 +     * @param ks Keyspace for which stats are to be displayed.
 +     * @param cf ColumnFamily for which stats are to be displayed.
 +     * @param metricName View {@link org.apache.cassandra.metrics.ColumnFamilyMetrics}.
 +     */
 +    public Object getColumnFamilyMetric(String ks, String cf, String metricName)
 +    {
 +        try
 +        {
 +            String type = cf.contains(".") ? "IndexColumnFamily": "ColumnFamily";
 +            ObjectName oName = new ObjectName(String.format("org.apache.cassandra.metrics:type=%s,keyspace=%s,scope=%s,name=%s", type, ks, cf, metricName));
 +            switch(metricName)
 +            {
 +                case "BloomFilterDiskSpaceUsed":
 +                case "BloomFilterFalsePositives":
 +                case "BloomFilterFalseRatio":
++                case "BloomFilterOffHeapMemoryUsed":
++                case "IndexSummaryOffHeapMemoryUsed":
++                case "CompressionMetadataOffHeapMemoryUsed":
 +                case "CompressionRatio":
 +                case "EstimatedColumnCountHistogram":
 +                case "EstimatedRowSizeHistogram":
 +                case "KeyCacheHitRate":
 +                case "LiveSSTableCount":
 +                case "MaxRowSize":
 +                case "MeanRowSize":
 +                case "MemtableColumnsCount":
 +                case "MemtableLiveDataSize":
++                case "MemtableOffHeapSize":
 +                case "MinRowSize":
 +                case "RecentBloomFilterFalsePositives":
 +                case "RecentBloomFilterFalseRatio":
 +                case "SnapshotsSize":
 +                    return JMX.newMBeanProxy(mbeanServerConn, oName, JmxReporter.GaugeMBean.class).getValue();
 +                case "LiveDiskSpaceUsed":
 +                case "MemtableSwitchCount":
 +                case "SpeculativeRetries":
 +                case "TotalDiskSpaceUsed":
 +                case "WriteTotalLatency":
 +                case "ReadTotalLatency":
 +                case "PendingFlushes":
 +                    return JMX.newMBeanProxy(mbeanServerConn, oName, JmxReporter.CounterMBean.class).getCount();
 +                case "ReadLatency":
 +                case "CoordinatorReadLatency":
 +                case "CoordinatorScanLatency":
 +                case "WriteLatency":
 +                    return JMX.newMBeanProxy(mbeanServerConn, oName, JmxReporter.TimerMBean.class);
 +                case "LiveScannedHistogram":
 +                case "SSTablesPerReadHistogram":
 +                case "TombstoneScannedHistogram":
 +                    return JMX.newMBeanProxy(mbeanServerConn, oName, JmxReporter.HistogramMBean.class);
 +                default:
 +                    throw new RuntimeException("Unknown column family metric.");
 +            }
 +        }
 +        catch (MalformedObjectNameException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    /**
 +     * Retrieve Proxy metrics
 +     * @param scope RangeSlice, Read or Write
 +     */
 +    public JmxReporter.TimerMBean getProxyMetric(String scope)
 +    {
 +        try
 +        {
 +            return JMX.newMBeanProxy(mbeanServerConn,
 +                    new ObjectName("org.apache.cassandra.metrics:type=ClientRequest,scope=" + scope + ",name=Latency"),
 +                    JmxReporter.TimerMBean.class);
 +        }
 +        catch (MalformedObjectNameException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    /**
 +     * Retrieve Proxy metrics
 +     * @param metricName CompletedTasks, PendingTasks, BytesCompacted or TotalCompactionsCompleted.
 +     */
 +    public Object getCompactionMetric(String metricName)
 +    {
 +        try
 +        {
 +            switch(metricName)
 +            {
 +                case "BytesCompacted":
 +                    return JMX.newMBeanProxy(mbeanServerConn,
 +                            new ObjectName("org.apache.cassandra.metrics:type=Compaction,name=" + metricName),
 +                            JmxReporter.CounterMBean.class);
 +                case "CompletedTasks":
 +                case "PendingTasks":
 +                    return JMX.newMBeanProxy(mbeanServerConn,
 +                            new ObjectName("org.apache.cassandra.metrics:type=Compaction,name=" + metricName),
 +                            JmxReporter.GaugeMBean.class).getValue();
 +                case "TotalCompactionsCompleted":
 +                    return JMX.newMBeanProxy(mbeanServerConn,
 +                            new ObjectName("org.apache.cassandra.metrics:type=Compaction,name=" + metricName),
 +                            JmxReporter.MeterMBean.class);
 +                default:
 +                    throw new RuntimeException("Unknown compaction metric.");
 +            }
 +        }
 +        catch (MalformedObjectNameException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    /**
 +     * Retrieve Proxy metrics
 +     * @param metricName Exceptions, Load, TotalHints or TotalHintsInProgress.
 +     */
 +    public long getStorageMetric(String metricName)
 +    {
 +        try
 +        {
 +            return JMX.newMBeanProxy(mbeanServerConn,
 +                    new ObjectName("org.apache.cassandra.metrics:type=Storage,name=" + metricName),
 +                    JmxReporter.CounterMBean.class).getCount();
 +        }
 +        catch (MalformedObjectNameException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    public double[] metricPercentilesAsArray(JmxReporter.HistogramMBean metric)
 +    {
 +        return new double[]{ metric.get50thPercentile(),
 +                metric.get75thPercentile(),
 +                metric.get95thPercentile(),
 +                metric.get98thPercentile(),
 +                metric.get99thPercentile(),
 +                metric.getMin(),
 +                metric.getMax()};
 +    }
 +
      public TabularData getCompactionHistory()
      {
          return compactionProxy.getCompactionHistory();


[5/5] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/371ad9ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/371ad9ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/371ad9ee

Branch: refs/heads/trunk
Commit: 371ad9eea63d95e563ae82e492bbfa7e6028a98e
Parents: 794d68b 9aaea24
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Nov 28 03:12:51 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 28 03:12:51 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 15 ++++
 .../cassandra/db/ColumnFamilyStoreMBean.java    | 18 +++++
 .../io/compress/CompressionMetadata.java        |  9 +++
 .../io/sstable/format/SSTableReader.java        | 21 ++++++
 .../cassandra/metrics/ColumnFamilyMetrics.java  | 36 ++++++++++
 .../cassandra/metrics/KeyspaceMetrics.java      | 27 +++++++
 .../org/apache/cassandra/tools/NodeProbe.java   |  4 ++
 .../org/apache/cassandra/tools/NodeTool.java    | 74 +++++++++++++++-----
 .../cassandra/utils/AlwaysPresentFilter.java    |  6 ++
 .../org/apache/cassandra/utils/IFilter.java     |  6 ++
 .../cassandra/utils/Murmur3BloomFilter.java     |  6 ++
 .../org/apache/cassandra/utils/obs/IBitSet.java |  6 ++
 .../cassandra/utils/obs/OffHeapBitSet.java      |  6 ++
 .../apache/cassandra/utils/obs/OpenBitSet.java  |  6 ++
 15 files changed, 224 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/371ad9ee/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/371ad9ee/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/371ad9ee/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/371ad9ee/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 90f3b92,0000000..c0d0abb
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1,1910 -1,0 +1,1931 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format;
 +
 +import java.io.*;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Ordering;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 +import com.clearspring.analytics.stream.cardinality.ICardinality;
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.cache.InstrumentingCache;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.dht.*;
 +import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 +import org.apache.cassandra.io.compress.CompressedThrottledReader;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.metadata.*;
 +import org.apache.cassandra.io.util.*;
 +import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.CacheService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 +
 +/**
 + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
 + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
 + */
 +public abstract class SSTableReader extends SSTable
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 +
 +    private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
 +    private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
 +
 +    public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            long ts1 = o1.getMaxTimestamp();
 +            long ts2 = o2.getMaxTimestamp();
 +            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
 +        }
 +    };
 +
 +    public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return o1.first.compareTo(o2.first);
 +        }
 +    };
 +
 +    public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
 +
 +    /**
 +     * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
 +     * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
 +     * later than maxDataAge.
 +     *
 +     * The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
 +     *
 +     * When a new sstable is flushed, maxDataAge is set to the time of creation.
 +     * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
 +     *
 +     * The age is in milliseconds since epoc and is local to this host.
 +     */
 +    public final long maxDataAge;
 +
 +    public enum OpenReason
 +    {
 +        NORMAL,
 +        EARLY,
 +        METADATA_CHANGE
 +    }
 +
 +    public final OpenReason openReason;
 +
 +    // indexfile and datafile: might be null before a call to load()
 +    protected SegmentedFile ifile;
 +    protected SegmentedFile dfile;
 +
 +    protected IndexSummary indexSummary;
 +    protected IFilter bf;
 +
 +    protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
 +
 +    protected final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 +
 +    protected final AtomicInteger references = new AtomicInteger(1);
 +    // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
 +    // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
 +    protected final AtomicBoolean isCompacted = new AtomicBoolean(false);
 +    protected final AtomicBoolean isSuspect = new AtomicBoolean(false);
 +
 +    // not final since we need to be able to change level on a file.
 +    protected volatile StatsMetadata sstableMetadata;
 +
 +    protected final AtomicLong keyCacheHit = new AtomicLong(0);
 +    protected final AtomicLong keyCacheRequest = new AtomicLong(0);
 +
 +    /**
 +     * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
 +     * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
 +     * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
 +     * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
 +     */
 +    protected Object replaceLock = new Object();
 +    protected SSTableReader replacedBy;
 +    private SSTableReader replaces;
 +    private SSTableReader sharesBfWith;
 +    private SSTableDeletingTask deletingTask;
 +    private Runnable runOnClose;
 +
 +    @VisibleForTesting
 +    public RestorableMeter readMeter;
 +    protected ScheduledFuture readMeterSyncFuture;
 +
 +    /**
 +     * Calculate approximate key count.
 +     * If cardinality estimator is available on all given sstables, then this method use them to estimate
 +     * key count.
 +     * If not, then this uses index summaries.
 +     *
 +     * @param sstables SSTables to calculate key count
 +     * @return estimated key count
 +     */
 +    public static long getApproximateKeyCount(Collection<SSTableReader> sstables)
 +    {
 +        long count = -1;
 +
 +        // check if cardinality estimator is available for all SSTables
 +        boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader sstable)
 +            {
 +                return sstable.descriptor.version.hasNewStatsFile();
 +            }
 +        });
 +
 +        // if it is, load them to estimate key count
 +        if (cardinalityAvailable)
 +        {
 +            boolean failed = false;
 +            ICardinality cardinality = null;
 +            for (SSTableReader sstable : sstables)
 +            {
 +                try
 +                {
 +                    CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
 +                    assert metadata != null : sstable.getFilename();
 +                    if (cardinality == null)
 +                        cardinality = metadata.cardinalityEstimator;
 +                    else
 +                        cardinality = cardinality.merge(metadata.cardinalityEstimator);
 +                }
 +                catch (IOException e)
 +                {
 +                    logger.warn("Reading cardinality from Statistics.db failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +                catch (CardinalityMergeException e)
 +                {
 +                    logger.warn("Cardinality merge failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +            }
 +            if (cardinality != null && !failed)
 +                count = cardinality.cardinality();
 +        }
 +
 +        // if something went wrong above or cardinality is not available, calculate using index summary
 +        if (count < 0)
 +        {
 +            for (SSTableReader sstable : sstables)
 +                count += sstable.estimatedKeys();
 +        }
 +        return count;
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor) throws IOException
 +    {
 +        CFMetaData metadata;
 +        if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR))
 +        {
 +            int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
 +            String parentName = descriptor.cfname.substring(0, i);
 +            CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
 +            ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
 +            metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
 +        }
 +        else
 +        {
 +            metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname);
 +        }
 +        return open(descriptor, metadata);
 +    }
 +
 +    public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
 +    {
 +        IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
 +                ? new LocalPartitioner(metadata.getKeyValidator())
 +                : StorageService.getPartitioner();
 +        return open(desc, componentsFor(desc), metadata, p);
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        return open(descriptor, components, metadata, partitioner, true);
 +    }
 +
 +    public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
 +    {
 +        return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
 +    }
 +
 +    /**
 +     * Open SSTable reader to be used in batch mode(such as sstableloader).
 +     *
 +     * @param descriptor
 +     * @param components
 +     * @param metadata
 +     * @param partitioner
 +     * @return opened SSTableReader
 +     * @throws IOException
 +     */
 +    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // special implementation of load to use non-pooled SegmentedFile builders
 +        SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
 +        SegmentedFile.Builder dbuilder = sstable.compression
 +                ? new CompressedSegmentedFile.Builder(null)
 +                : new BufferedSegmentedFile.Builder();
 +        if (!sstable.loadSummary(ibuilder, dbuilder))
 +            sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
 +        sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
 +        sstable.bf = FilterFactory.AlwaysPresent;
 +
 +        return sstable;
 +    }
 +
 +    private static SSTableReader open(Descriptor descriptor,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      boolean validate) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // load index and filter
 +        long start = System.nanoTime();
 +        sstable.load(validationMetadata);
 +        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +
 +        if (validate)
 +            sstable.validate();
 +
 +        if (sstable.getKeyCache() != null)
 +            logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 +
 +        return sstable;
 +    }
 +
 +    public static void logOpenException(Descriptor descriptor, IOException e)
 +    {
 +        if (e instanceof FileNotFoundException)
 +            logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage());
 +        else
 +            logger.error("Corrupt sstable {}; skipped", descriptor, e);
 +    }
 +
 +    public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
 +                                                    final CFMetaData metadata,
 +                                                    final IPartitioner partitioner)
 +    {
 +        final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 +
 +        ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
 +        for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
 +        {
 +            Runnable runnable = new Runnable()
 +            {
 +                public void run()
 +                {
 +                    SSTableReader sstable;
 +                    try
 +                    {
 +                        sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
 +                    }
 +                    catch (IOException ex)
 +                    {
 +                        logger.error("Corrupt sstable {}; skipped", entry, ex);
 +                        return;
 +                    }
 +                    sstables.add(sstable);
 +                }
 +            };
 +            executor.submit(runnable);
 +        }
 +
 +        executor.shutdown();
 +        try
 +        {
 +            executor.awaitTermination(7, TimeUnit.DAYS);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +
 +        return sstables;
 +
 +    }
 +
 +    /**
 +     * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
 +     */
 +    public static SSTableReader internalOpen(Descriptor desc,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      SegmentedFile ifile,
 +                                      SegmentedFile dfile,
 +                                      IndexSummary isummary,
 +                                      IFilter bf,
 +                                      long maxDataAge,
 +                                      StatsMetadata sstableMetadata,
 +                                      OpenReason openReason)
 +    {
 +        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
 +
 +        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +
 +        reader.bf = bf;
 +        reader.ifile = ifile;
 +        reader.dfile = dfile;
 +        reader.indexSummary = isummary;
 +
 +        return reader;
 +    }
 +
 +
 +    private static SSTableReader internalOpen(final Descriptor descriptor,
 +                                            Set<Component> components,
 +                                            CFMetaData metadata,
 +                                            IPartitioner partitioner,
 +                                            Long maxDataAge,
 +                                            StatsMetadata sstableMetadata,
 +                                            OpenReason openReason)
 +    {
 +        Factory readerFactory = descriptor.getFormat().getReaderFactory();
 +
 +        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +    }
 +
 +    protected SSTableReader(final Descriptor desc,
 +                            Set<Component> components,
 +                            CFMetaData metadata,
 +                            IPartitioner partitioner,
 +                            long maxDataAge,
 +                            StatsMetadata sstableMetadata,
 +                            OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner);
 +        this.sstableMetadata = sstableMetadata;
 +        this.maxDataAge = maxDataAge;
 +        this.openReason = openReason;
 +
 +        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
 +
 +        deletingTask = new SSTableDeletingTask(this);
 +
 +        // Don't track read rates for tables in the system keyspace.  Also don't track reads for special operations (like early open)
 +        // this is to avoid overflowing the executor queue (see CASSANDRA-8066)
 +        if (SystemKeyspace.NAME.equals(desc.ksname) || openReason != OpenReason.NORMAL)
 +        {
 +            readMeter = null;
 +            readMeterSyncFuture = null;
 +            return;
 +        }
 +
 +        readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
 +        // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
 +        readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
 +        {
 +            public void run()
 +            {
 +                if (!isCompacted.get())
 +                {
 +                    meterSyncThrottle.acquire();
 +                    SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
 +                }
 +            }
 +        }, 1, 5, TimeUnit.MINUTES);
 +    }
 +
 +    public static long getTotalBytes(Iterable<SSTableReader> sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sum += sstable.onDiskLength();
 +        }
 +        return sum;
 +    }
 +
 +    private void tidy(boolean release)
 +    {
 +        if (readMeterSyncFuture != null)
 +            readMeterSyncFuture.cancel(false);
 +
 +        if (references.get() != 0)
 +        {
 +            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
 +        }
 +
 +        synchronized (replaceLock)
 +        {
 +            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
 +
 +            if (replacedBy != null)
 +            {
 +                closeBf = replacedBy.bf != bf;
 +                closeSummary = replacedBy.indexSummary != indexSummary;
 +                closeFiles = replacedBy.dfile != dfile;
 +                // if the replacement sstablereader uses a different path, clean up our paths
 +                deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
 +            }
 +
 +            if (replaces != null)
 +            {
 +                closeBf &= replaces.bf != bf;
 +                closeSummary &= replaces.indexSummary != indexSummary;
 +                closeFiles &= replaces.dfile != dfile;
 +                deleteFiles &= !dfile.path.equals(replaces.dfile.path);
 +            }
 +
 +            if (sharesBfWith != null)
 +            {
 +                closeBf &= sharesBfWith.bf != bf;
 +                closeSummary &= sharesBfWith.indexSummary != indexSummary;
 +                closeFiles &= sharesBfWith.dfile != dfile;
 +                deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path);
 +            }
 +
 +            boolean deleteAll = false;
 +            if (release && isCompacted.get())
 +            {
 +                assert replacedBy == null;
 +                if (replaces != null)
 +                {
 +                    replaces.replacedBy = null;
 +                    replaces.deletingTask = deletingTask;
 +                    replaces.markObsolete();
 +                }
 +                else
 +                {
 +                    deleteAll = true;
 +                }
 +            }
 +            else
 +            {
 +                if (replaces != null)
 +                    replaces.replacedBy = replacedBy;
 +                if (replacedBy != null)
 +                    replacedBy.replaces = replaces;
 +            }
 +
 +            scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
 +        }
 +    }
 +
 +    private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
 +    {
 +        if (references.get() != 0)
 +            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
 +
 +        final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
 +        final OpOrder.Barrier barrier;
 +        if (cfs != null)
 +        {
 +            barrier = cfs.readOrdering.newBarrier();
 +            barrier.issue();
 +        }
 +        else
 +            barrier = null;
 +
 +        ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
 +        {
 +            public void run()
 +            {
 +                if (barrier != null)
 +                    barrier.await();
 +                if (closeBf)
 +                    bf.close();
 +                if (closeSummary)
 +                    indexSummary.close();
 +                if (closeFiles)
 +                {
 +                    ifile.cleanup();
 +                    dfile.cleanup();
 +                }
 +                if (runOnClose != null)
 +                    runOnClose.run();
 +                if (deleteAll)
 +                {
 +                    /**
 +                     * Do the OS a favour and suggest (using fadvice call) that we
 +                     * don't want to see pages of this SSTable in memory anymore.
 +                     *
 +                     * NOTE: We can't use madvice in java because it requires the address of
 +                     * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
 +                     */
 +                    dropPageCache();
 +                    deletingTask.run();
 +                }
 +                else if (deleteFiles)
 +                {
 +                    FileUtils.deleteWithConfirm(new File(dfile.path));
 +                    FileUtils.deleteWithConfirm(new File(ifile.path));
 +                }
 +            }
 +        });
 +    }
 +
 +    public boolean equals(Object that)
 +    {
 +        return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
 +    }
 +
 +    public int hashCode()
 +    {
 +        return this.descriptor.hashCode();
 +    }
 +
 +    public String getFilename()
 +    {
 +        return dfile.path;
 +    }
 +
 +    public String getIndexFilename()
 +    {
 +        return ifile.path;
 +    }
 +
 +    public void setTrackedBy(DataTracker tracker)
 +    {
 +        deletingTask.setTracker(tracker);
 +        // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
 +        // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
 +        // here when we know we're being wired into the rest of the server infrastructure.
 +        keyCache = CacheService.instance.keyCache;
 +    }
 +
 +    private void load(ValidationMetadata validation) throws IOException
 +    {
 +        if (metadata.getBloomFilterFpChance() == 1.0)
 +        {
 +            // bf is disabled.
 +            load(false, true);
 +            bf = FilterFactory.AlwaysPresent;
 +        }
 +        else if (!components.contains(Component.FILTER) || validation == null)
 +        {
 +            // bf is enabled, but filter component is missing.
 +            load(true, true);
 +        }
 +        else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
 +        {
 +            // bf fp chance in sstable metadata and it has changed since compaction.
 +            load(true, true);
 +        }
 +        else
 +        {
 +            // bf is enabled and fp chance matches the currently configured value.
 +            load(false, true);
 +            loadBloomFilter();
 +        }
 +    }
 +
 +    /**
 +     * Load bloom filter from Filter.db file.
 +     *
 +     * @throws IOException
 +     */
 +    private void loadBloomFilter() throws IOException
 +    {
 +        DataInputStream stream = null;
 +        try
 +        {
 +            stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
 +            bf = FilterFactory.deserialize(stream, true);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(stream);
 +        }
 +    }
 +
 +    /**
 +     * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
 +     * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
 +     *                             avoid persisting it to disk by setting this to false
 +     */
 +    private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
 +    {
 +        SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +        SegmentedFile.Builder dbuilder = compression
 +                ? SegmentedFile.getCompressedBuilder()
 +                : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +
 +        boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
 +        if (recreateBloomFilter || !summaryLoaded)
 +            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 +
 +        ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 +        if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
 +            saveSummary(ibuilder, dbuilder);
 +    }
 +
 +    /**
 +     * Build index summary(and optionally bloom filter) by reading through Index.db file.
 +     *
 +     * @param recreateBloomFilter true if recreate bloom filter
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @param summaryLoaded true if index summary is already loaded and not need to build again
 +     * @throws IOException
 +     */
 +    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            long histogramCount = sstableMetadata.estimatedRowSize.count();
 +            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
 +                    ? histogramCount
 +                    : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
 +
 +            if (recreateBloomFilter)
 +                bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
 +
 +            IndexSummaryBuilder summaryBuilder = null;
 +            if (!summaryLoaded)
 +                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
 +
 +            long indexPosition;
 +            RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
 +
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
 +                RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
 +                DecoratedKey decoratedKey = partitioner.decorateKey(key);
 +                if (first == null)
 +                    first = decoratedKey;
 +                last = decoratedKey;
 +
 +                if (recreateBloomFilter)
 +                    bf.add(decoratedKey.getKey());
 +
 +                // if summary was already read from disk we don't want to re-populate it using primary index
 +                if (!summaryLoaded)
 +                {
 +                    summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
 +                    ibuilder.addPotentialBoundary(indexPosition);
 +                    dbuilder.addPotentialBoundary(indexEntry.position);
 +                }
 +            }
 +
 +            if (!summaryLoaded)
 +                indexSummary = summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +
 +        first = getMinimalKey(first);
 +        last = getMinimalKey(last);
 +    }
 +
 +    /**
 +     * Load index summary from Summary.db file if it exists.
 +     *
 +     * if loaded index summary has different index interval from current value stored in schema,
 +     * then Summary.db file will be deleted and this returns false to rebuild summary.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @return true if index summary is loaded successfully from Summary.db file.
 +     */
 +    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (!summariesFile.exists())
 +            return false;
 +
 +        DataInputStream iStream = null;
 +        try
 +        {
 +            iStream = new DataInputStream(new FileInputStream(summariesFile));
 +            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
 +            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            ibuilder.deserializeBounds(iStream);
 +            dbuilder.deserializeBounds(iStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
 +            // corrupted; delete it and fall back to creating a new summary
 +            FileUtils.closeQuietly(iStream);
 +            // delete it and fall back to creating a new summary
 +            FileUtils.deleteWithConfirm(summariesFile);
 +            return false;
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(iStream);
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Save index summary to Summary.db file.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     */
 +    public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        saveSummary(ibuilder, dbuilder, indexSummary);
 +    }
 +
 +    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (summariesFile.exists())
 +            FileUtils.deleteWithConfirm(summariesFile);
 +
 +        DataOutputStreamAndChannel oStream = null;
 +        try
 +        {
 +            oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
 +            IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
 +            ByteBufferUtil.writeWithLength(first.getKey(), oStream);
 +            ByteBufferUtil.writeWithLength(last.getKey(), oStream);
 +            ibuilder.serializeBounds(oStream);
 +            dbuilder.serializeBounds(oStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot save SSTable Summary: ", e);
 +
 +            // corrupted hence delete it and let it load it now.
 +            if (summariesFile.exists())
 +                FileUtils.deleteWithConfirm(summariesFile);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(oStream);
 +        }
 +    }
 +
 +    public void setReplacedBy(SSTableReader replacement)
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +            replacedBy = replacement;
 +            replacement.replaces = this;
 +            replacement.replaceLock = replaceLock;
 +        }
 +    }
 +
 +    /**
 +     * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter
 +     *
 +     * note that the reason we don't use replacedBy is that we are not yet actually replaced
 +     *
 +     * @param newReader
 +     */
 +    public void sharesBfWith(SSTableReader newReader)
 +    {
 +        assert openReason.equals(OpenReason.EARLY);
 +        this.sharesBfWith = newReader;
 +    }
 +
 +    public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +
 +            if (newStart.compareTo(this.first) > 0)
 +            {
 +                if (newStart.compareTo(this.last) > 0)
 +                {
 +                    this.runOnClose = new Runnable()
 +                    {
 +                        public void run()
 +                        {
 +                            CLibrary.trySkipCache(dfile.path, 0, 0);
 +                            CLibrary.trySkipCache(ifile.path, 0, 0);
 +                            runOnClose.run();
 +                        }
 +                    };
 +                }
 +                else
 +                {
 +                    final long dataStart = getPosition(newStart, Operator.GE).position;
 +                    final long indexStart = getIndexScanPosition(newStart);
 +                    this.runOnClose = new Runnable()
 +                    {
 +                        public void run()
 +                        {
 +                            CLibrary.trySkipCache(dfile.path, 0, dataStart);
 +                            CLibrary.trySkipCache(ifile.path, 0, indexStart);
 +                            runOnClose.run();
 +                        }
 +                    };
 +                }
 +            }
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata,
 +                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
 +            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
 +            replacement.readMeter = this.readMeter;
 +            replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    /**
 +     * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
 +     * be built at the target samplingLevel.  This (original) SSTableReader instance will be marked as replaced, have
 +     * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
 +     * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
 +     * @return a new SSTableReader
 +     * @throws IOException
 +     */
 +    public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +
 +            int minIndexInterval = metadata.getMinIndexInterval();
 +            int maxIndexInterval = metadata.getMaxIndexInterval();
 +            double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 +
 +            IndexSummary newSummary;
 +            long oldSize = bytesOnDisk();
 +
 +            // We have to rebuild the summary from the on-disk primary index in three cases:
 +            // 1. The sampling level went up, so we need to read more entries off disk
 +            // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
 +            //    at full sampling (and consequently at any other sampling level)
 +            // 3. The max_index_interval was lowered, forcing us to raise the sampling level
 +            if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
 +            {
 +                newSummary = buildSummaryAtLevel(samplingLevel);
 +            }
 +            else if (samplingLevel < indexSummary.getSamplingLevel())
 +            {
 +                // we can use the existing index summary to make a smaller one
 +                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
 +
 +                SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +                SegmentedFile.Builder dbuilder = compression
 +                        ? SegmentedFile.getCompressedBuilder()
 +                        : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +                saveSummary(ibuilder, dbuilder, newSummary);
 +            }
 +            else
 +            {
 +                throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
 +                        "no adjustments to min/max_index_interval");
 +            }
 +
 +            long newSize = bytesOnDisk();
 +            StorageMetrics.load.inc(newSize - oldSize);
 +            parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata,
 +                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
 +            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
 +            replacement.readMeter = this.readMeter;
 +            replacement.first = this.first;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
 +
 +            long indexPosition;
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
 +                RowIndexEntry.Serializer.skip(primaryIndex);
 +            }
 +
 +            return summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +    }
 +
 +    public int getIndexSummarySamplingLevel()
 +    {
 +        return indexSummary.getSamplingLevel();
 +    }
 +
 +    public long getIndexSummaryOffHeapSize()
 +    {
 +        return indexSummary.getOffHeapSize();
 +    }
 +
 +    public int getMinIndexInterval()
 +    {
 +        return indexSummary.getMinIndexInterval();
 +    }
 +
 +    public double getEffectiveIndexInterval()
 +    {
 +        return indexSummary.getEffectiveIndexInterval();
 +    }
 +
 +    public void releaseSummary() throws IOException
 +    {
 +        indexSummary.close();
 +        indexSummary = null;
 +    }
 +
 +    private void validate()
 +    {
 +        if (this.first.compareTo(this.last) > 0)
 +            throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
 +    }
 +
 +    /**
 +     * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
 +     * modulo downsampling of the index summary).
 +     */
 +    public long getIndexScanPosition(RowPosition key)
 +    {
 +        return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
 +    }
 +
 +    protected static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
 +    {
 +        if (binarySearchResult == -1)
 +            return -1;
 +        else
 +            return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
 +    }
 +
 +    protected static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
 +    {
 +        if (binarySearchResult < 0)
 +        {
 +            // binary search gives us the first index _greater_ than the key searched for,
 +            // i.e., its insertion position
 +            int greaterThan = (binarySearchResult + 1) * -1;
 +            if (greaterThan == 0)
 +                return -1;
 +            return greaterThan - 1;
 +        }
 +        else
 +        {
 +            return binarySearchResult;
 +        }
 +    }
 +
 +    /**
 +     * Returns the compression metadata for this sstable.
 +     * @throws IllegalStateException if the sstable is not compressed
 +     */
 +    public CompressionMetadata getCompressionMetadata()
 +    {
 +        if (!compression)
 +            throw new IllegalStateException(this + " is not compressed");
 +
 +        CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
 +
 +        //We need the parent cf metadata
 +        String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
 +        cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
 +
 +        return cmd;
 +    }
 +
 +    /**
++     * Returns the amount of memory in bytes used off heap by the compression meta-data.
++     * @return the amount of memory in bytes used off heap by the compression meta-data
++     */
++    public long getCompressionMetadataOffHeapSize()
++    {
++        if (!compression)
++            return 0;
++
++        return getCompressionMetadata().offHeapSize();
++    }
++
++    /**
 +     * For testing purposes only.
 +     */
 +    public void forceFilterFailures()
 +    {
 +        bf = FilterFactory.AlwaysPresent;
 +    }
 +
 +    public IFilter getBloomFilter()
 +    {
 +        return bf;
 +    }
 +
 +    public long getBloomFilterSerializedSize()
 +    {
 +        return bf.serializedSize();
 +    }
 +
 +    /**
++     * Returns the amount of memory in bytes used off heap by the bloom filter.
++     * @return the amount of memory in bytes used off heap by the bloom filter
++     */
++    public long getBloomFilterOffHeapSize()
++    {
++        return bf.offHeapSize();
++    }
++
++    /**
 +     * @return An estimate of the number of keys in this SSTable based on the index summary.
 +     */
 +    public long estimatedKeys()
 +    {
 +        return indexSummary.getEstimatedKeyCount();
 +    }
 +
 +    /**
 +     * @param ranges
 +     * @return An estimate of the number of keys for given ranges in this SSTable.
 +     */
 +    public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
 +    {
 +        long sampleKeyCount = 0;
 +        List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
 +        for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
 +            sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
 +
 +        // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling
 +        long estimatedKeys = sampleKeyCount * (Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel();
 +        return Math.max(1, estimatedKeys);
 +    }
 +
 +    /**
 +     * Returns the number of entries in the IndexSummary.  At full sampling, this is approximately 1/INDEX_INTERVALth of
 +     * the keys in this SSTable.
 +     */
 +    public int getIndexSummarySize()
 +    {
 +        return indexSummary.size();
 +    }
 +
 +    /**
 +     * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
 +     */
 +    public int getMaxIndexSummarySize()
 +    {
 +        return indexSummary.getMaxNumberOfEntries();
 +    }
 +
 +    /**
 +     * Returns the key for the index summary entry at `index`.
 +     */
 +    public byte[] getIndexSummaryKey(int index)
 +    {
 +        return indexSummary.getKey(index);
 +    }
 +
 +    private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Integer,Integer>> positions = new ArrayList<>();
 +
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            RowPosition leftPosition = range.left.maxKeyBound();
 +            RowPosition rightPosition = range.right.maxKeyBound();
 +
 +            int left = summary.binarySearch(leftPosition);
 +            if (left < 0)
 +                left = (left + 1) * -1;
 +            else
 +                // left range are start exclusive
 +                left = left + 1;
 +            if (left == summary.size())
 +                // left is past the end of the sampling
 +                continue;
 +
 +            int right = Range.isWrapAround(range.left, range.right)
 +                    ? summary.size() - 1
 +                    : summary.binarySearch(rightPosition);
 +            if (right < 0)
 +            {
 +                // range are end inclusive so we use the previous index from what binarySearch give us
 +                // since that will be the last index we will return
 +                right = (right + 1) * -1;
 +                if (right == 0)
 +                    // Means the first key is already stricly greater that the right bound
 +                    continue;
 +                right--;
 +            }
 +
 +            if (left > right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
 +    {
 +        final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
 +
 +        if (indexRanges.isEmpty())
 +            return Collections.emptyList();
 +
 +        return new Iterable<DecoratedKey>()
 +        {
 +            public Iterator<DecoratedKey> iterator()
 +            {
 +                return new Iterator<DecoratedKey>()
 +                {
 +                    private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator();
 +                    private Pair<Integer, Integer> current;
 +                    private int idx;
 +
 +                    public boolean hasNext()
 +                    {
 +                        if (current == null || idx > current.right)
 +                        {
 +                            if (rangeIter.hasNext())
 +                            {
 +                                current = rangeIter.next();
 +                                idx = current.left;
 +                                return true;
 +                            }
 +                            return false;
 +                        }
 +
 +                        return true;
 +                    }
 +
 +                    public DecoratedKey next()
 +                    {
 +                        byte[] bytes = indexSummary.getKey(idx++);
 +                        return partitioner.decorateKey(ByteBuffer.wrap(bytes));
 +                    }
 +
 +                    public void remove()
 +                    {
 +                        throw new UnsupportedOperationException();
 +                    }
 +                };
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
 +     * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
 +     */
 +    public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Long,Long>> positions = new ArrayList<>();
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            AbstractBounds<RowPosition> keyRange = range.toRowBounds();
 +            RowIndexEntry idxLeft = getPosition(keyRange.left, Operator.GT);
 +            long left = idxLeft == null ? -1 : idxLeft.position;
 +            if (left == -1)
 +                // left is past the end of the file
 +                continue;
 +            RowIndexEntry idxRight = getPosition(keyRange.right, Operator.GT);
 +            long right = idxRight == null ? -1 : idxRight.position;
 +            if (right == -1 || Range.isWrapAround(range.left, range.right))
 +                // right is past the end of the file, or it wraps
 +                right = uncompressedLength();
 +            if (left == right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public void invalidateCacheKey(DecoratedKey key)
 +    {
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        keyCache.remove(cacheKey);
 +    }
 +
 +    public void cacheKey(DecoratedKey key, RowIndexEntry info)
 +    {
 +        CachingOptions caching = metadata.getCaching();
 +
 +        if (!caching.keyCache.isEnabled()
 +                || keyCache == null
 +                || keyCache.getCapacity() == 0)
 +        {
 +            return;
 +        }
 +
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
 +        keyCache.put(cacheKey, info);
 +    }
 +
 +    public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
 +    {
 +        return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
 +    }
 +
 +    protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
 +    {
 +        if (keyCache != null && keyCache.getCapacity() > 0) {
 +            if (updateStats)
 +            {
 +                RowIndexEntry cachedEntry = keyCache.get(unifiedKey);
 +                keyCacheRequest.incrementAndGet();
 +                if (cachedEntry != null)
 +                    keyCacheHit.incrementAndGet();
 +                return cachedEntry;
 +            }
 +            else
 +            {
 +                return keyCache.getInternal(unifiedKey);
 +            }
 +        }
 +        return null;
 +    }
 +
 +    /**
 +     * Get position updating key cache and stats.
 +     * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
 +     */
 +    public RowIndexEntry getPosition(RowPosition key, Operator op)
 +    {
 +        return getPosition(key, op, true);
 +    }
 +
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key is not present
 +     */
 +    public abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats);
 +
 +    //Corresponds to a name column
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
 +
 +    //Corresponds to a slice query
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
 +
 +    /**
 +     * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
 +     */
 +    public DecoratedKey firstKeyBeyond(RowPosition token)
 +    {
 +        long sampledPosition = getIndexScanPosition(token);
 +        if (sampledPosition == -1)
 +            sampledPosition = 0;
 +
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext())
 +        {
 +            FileDataInput in = segments.next();
 +            try
 +            {
 +                while (!in.isEOF())
 +                {
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +                    DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
 +                    if (indexDecoratedKey.compareTo(token) > 0)
 +                        return indexDecoratedKey;
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, in.getPath());
 +            }
 +            finally
 +            {
 +                FileUtils.closeQuietly(in);
 +            }
 +        }
 +
 +        return null;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the data for this SSTable. For
 +     * compressed files, this is not the same thing as the on disk size (see
 +     * onDiskLength())
 +     */
 +    public long uncompressedLength()
 +    {
 +        return dfile.length;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the on disk size for this SSTable. For
 +     * compressed files, this is not the same thing as the data length (see
 +     * length())
 +     */
 +    public long onDiskLength()
 +    {
 +        return dfile.onDiskLength;
 +    }
 +
 +    public boolean acquireReference()
 +    {
 +        while (true)
 +        {
 +            int n = references.get();
 +            if (n <= 0)
 +                return false;
 +            if (references.compareAndSet(n, n + 1))
 +                return true;
 +        }
 +    }
 +
 +    @VisibleForTesting
 +    public int referenceCount()
 +    {
 +        return references.get();
 +    }
 +
 +    /**
 +     * Release reference to this SSTableReader.
 +     * If there is no one referring to this SSTable, and is marked as compacted,
 +     * all resources are cleaned up and files are deleted eventually.
 +     */
 +    public void releaseReference()
 +    {
 +        if (references.decrementAndGet() == 0)
 +            tidy(true);
 +        assert references.get() >= 0 : "Reference counter " +  references.get() + " for " + dfile.path;
 +    }
 +
 +    /**
 +     * Mark the sstable as obsolete, i.e., compacted into newer sstables.
 +     *
 +     * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
 +     * except for threads holding a reference.
 +     *
 +     * @return true if the this is the first time the file was marked obsolete.  Calling this
 +     * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize).
 +     */
 +    public boolean markObsolete()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} compacted", getFilename());
 +
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null : getFilename();
 +        }
 +        return !isCompacted.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedCompacted()
 +    {
 +        return isCompacted.get();
 +    }
 +
 +    public void markSuspect()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} as a suspect for blacklisting.", getFilename());
 +
 +        isSuspect.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedSuspect()
 +    {
 +        return isSuspect.get();
 +    }
 +
 +
 +    /**
 +     * I/O SSTableScanner
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ICompactionScanner getScanner()
 +    {
 +        return getScanner((RateLimiter) null);
 +    }
 +
 +    public ICompactionScanner getScanner(RateLimiter limiter)
 +    {
 +        return getScanner(DataRange.allData(partitioner), limiter);
 +    }
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ICompactionScanner getScanner(DataRange dataRange)
 +    {
 +        return getScanner(dataRange, null);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined range of tokens.
 +     *
 +     * @param range the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ICompactionScanner getScanner(Range<Token> range, RateLimiter limiter)
 +    {
 +        if (range == null)
 +            return getScanner(limiter);
 +        return getScanner(Collections.singletonList(range), limiter);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ICompactionScanner getScanner(DataRange dataRange, RateLimiter limiter);
 +
 +
 +
 +    public FileDataInput getFileDataInput(long position)
 +    {
 +        return dfile.getSegment(position);
 +    }
 +
 +    /**
 +     * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time).
 +     * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable.
 +     * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host
 +     * @return True iff this sstable contains data that's newer than the given age parameter.
 +     */
 +    public boolean newSince(long age)
 +    {
 +        return maxDataAge > age;
 +    }
 +
 +    public void createLinks(String snapshotDirectoryPath)
 +    {
 +        for (Component component : components)
 +        {
 +            File sourceFile = new File(descriptor.filenameFor(component));
 +            File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
 +            FileUtils.createHardLink(sourceFile, targetLink);
 +        }
 +    }
 +
 +    public boolean isRepaired()
 +    {
 +        return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
 +    }
 +
 +    public SSTableReader getCurrentReplacement()
 +    {
 +        synchronized (replaceLock)
 +        {
 +            SSTableReader cur = this, next = replacedBy;
 +            while (next != null)
 +            {
 +                cur = next;
 +                next = next.replacedBy;
 +            }
 +            return cur;
 +        }
 +    }
 +
 +    /**
 +     * TODO: Move someplace reusable
 +     */
 +    public abstract static class Operator
 +    {
 +        public static final Operator EQ = new Equals();
 +        public static final Operator GE = new GreaterThanOrEqualTo();
 +        public static final Operator GT = new GreaterThan();
 +
 +        /**
 +         * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
 +         * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
 +         */
 +        public abstract int apply(int comparison);
 +
 +        final static class Equals extends Operator
 +        {
 +            public int apply(int comparison) { return -comparison; }
 +        }
 +
 +        final static class GreaterThanOrEqualTo extends Operator
 +        {
 +            public int apply(int comparison) { return comparison >= 0 ? 0 : -comparison; }
 +        }
 +
 +        final static class GreaterThan extends Operator
 +        {
 +            public int apply(int comparison) { return comparison > 0 ? 0 : 1; }
 +        }
 +    }
 +
 +    public long getBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getFalsePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentFalsePositiveCount();
 +    }
 +
 +    public long getBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getTruePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentTruePositiveCount();
 +    }
 +
 +    public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache()
 +    {
 +        return keyCache;
 +    }
 +
 +    public EstimatedHistogram getEstimatedRowSize()
 +    {
 +        return sstableMetadata.estimatedRowSize;
 +    }
 +
 +    public EstimatedHistogram getEstimatedColumnCount()
 +    {
 +        return sstableMetadata.estimatedColumnCount;
 +    }
 +
 +    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
 +    {
 +        return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
 +    }
 +
 +    public double getDroppableTombstonesBefore(int gcBefore)
 +    {
 +        return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
 +    }
 +
 +    public double getCompressionRatio()
 +    {
 +        return sstableMetadata.compressionRatio;
 +    }
 +
 +    public ReplayPosition getReplayPosition()
 +    {
 +        return sstableMetadata.replayPosition;
 +    }
 +
 +    public long getMinTimestamp()
 +    {
 +        return sstableMetadata.minTimestamp;
 +    }
 +
 +    public long getMaxTimestamp()
 +    {
 +        return sstableMetadata.maxTimestamp;
 +    }
 +
 +    public Set<Integer> getAncestors()
 +    {
 +        try
 +        {
 +            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
 +            return compactionMetadata.ancestors;
 +        }
 +        catch (IOException e)
 +        {
 +            SSTableReader.logOpenException(descriptor, e);
 +            return Collections.emptySet();
 +        }
 +    }
 +
 +    public int getSSTableLevel()
 +    {
 +        return sstableMetadata.sstableLevel;
 +    }
 +
 +    /**
 +     * Reloads the sstable metadata from disk.
 +     *
 +     * Called after level is changed on sstable, for example if the sstable is dropped to L0
 +     *
 +     * Might be possible to remove in future versions
 +     *
 +     * @throws IOException
 +     */
 +    public void reloadSSTableMetadata() throws IOException
 +    {
 +        this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
 +    }
 +
 +    public StatsMetadata getSSTableMetadata()
 +    {
 +        return sstableMetadata;
 +    }
 +
 +    public RandomAccessReader openDataReader(RateLimiter limiter)
 +    {
 +        assert limiter != null;
 +        return compression
 +                ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter)
 +                : ThrottledReader.open(new File(getFilename()), limiter);
 +    }
 +
 +    public RandomAccessReader openDataReader()
 +    {
 +        return compression
 +                ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata())
 +                : RandomAccessReader.open(new File(getFilename()));
 +    }
 +
 +    public RandomAccessReader openIndexReader()
 +    {
 +        return RandomAccessReader.open(new File(getIndexFilename()));
 +    }
 +
 +    /**
 +     * @param component component to get timestamp.
 +     * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
 +     */
 +    public long getCreationTimeFor(Component component)
 +    {
 +        return new File(descriptor.filenameFor(component)).lastModified();
 +    }
 +
 +    /**
 +     * @return Number of key cache hit
 +     */
 +    public long getKeyCacheHit()
 +    {
 +        return keyCacheHit.get();
 +    }
 +
 +    /**
 +     * @return Number of key cache request
 +     */
 +    public long getKeyCacheRequest()
 +    {
 +        return keyCacheRequest.get();
 +    }
 +
 +    /**
 +     * @param sstables
 +     * @return true if all desired references were acquired.  Otherwise, it will unreference any partial acquisition, and return false.
 +     */
 +    public static boolean acquireReferences(Iterable<SSTableReader> sstables)
 +    {
 +        SSTableReader failed = null;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (!sstable.acquireReference())
 +            {
 +                failed = sstable;
 +                break;
 +            }
 +        }
 +
 +        if (failed == null)
 +            return true;
 +
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable == failed)
 +                break;
 +            sstable.releaseReference();
 +        }
 +        return false;
 +    }
 +
 +    public static void releaseReferences(Iterable<SSTableReader> sstables)
 +    {
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sstable.releaseReference();
 +        }
 +    }
 +
 +    private void dropPageCache()
 +    {
 +        dropPageCache(dfile.path);
 +        dropPageCache(ifile.path);
 +    }
 +
 +    private void dropPageCache(String filePath)
 +    {
 +        RandomAccessFile file = null;
 +
 +        try
 +        {
 +            file = new RandomAccessFile(filePath, "r");
 +
 +            int fd = CLibrary.getfd(file.getFD());
 +
 +            if (fd > 0)
 +            {
 +                if (logger.isDebugEnabled())
 +                    logger.debug(String.format("Dropping page cache of file %s.", filePath));
 +
 +                CLibrary.trySkipCache(fd, 0, 0);
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            // we don't care if cache cleanup fails
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(file);
 +        }
 +    }
 +
 +    /**
 +     * Increment the total row read count and read rate for this SSTable.  This should not be incremented for range
 +     * slice queries, row cache hits, or non-query reads, like compaction.
 +     */
 +    public void incrementReadCount()
 +    {
 +        if (readMeter != null)
 +            readMeter.mark();
 +    }
 +
 +    protected class EmptyCompactionScanner implements ICompactionScanner
 +    {
 +        private final String filename;
 +
 +        public EmptyCompactionScanner(String filename)
 +        {
 +            this.filename = filename;
 +        }
 +
 +        public long getLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        public long getCurrentPosition()
 +        {
 +            return 0;
 +        }
 +
 +        public String getBackingFiles()
 +        {
 +            return filename;
 +        }
 +
 +        public boolean hasNext()
 +        {
 +            return false;
 +        }
 +
 +        public OnDiskAtomIterator next()
 +        {
 +            return null;
 +        }
 +
 +        public void close() throws IOException { }
 +
 +        public void remove() { }
 +    }
 +
 +    public static class SizeComparator implements Comparator<SSTableReader>
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
 +        }
 +    }
 +
 +    public static abstract class Factory
 +    {
 +        public abstract SSTableReader open(final Descriptor descriptor,
 +                                           Set<Component> components,
 +                                           CFMetaData metadata,
 +                                           IPartitioner partitioner,
 +                                           Long maxDataAge,
 +                                           StatsMetadata sstableMetadata,
 +                                           OpenReason openReason);
 +
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/371ad9ee/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/371ad9ee/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/371ad9ee/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/371ad9ee/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------