You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/08/30 14:48:36 UTC

[1/3] git commit: New metrics; patch by yukim reviewed by brandonwilliams for CASSANDRA-4009

Updated Branches:
  refs/heads/trunk 0525ae25f -> 69cedbfca


New metrics; patch by yukim reviewed by brandonwilliams for CASSANDRA-4009


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/69cedbfc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/69cedbfc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/69cedbfc

Branch: refs/heads/trunk
Commit: 69cedbfcaad18f40dcf97648c8adf129614054db
Parents: 4e6167d
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Aug 30 07:34:33 2012 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Aug 30 07:47:56 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/cache/AutoSavingCache.java    |    2 +-
 .../apache/cassandra/cache/InstrumentingCache.java |   51 +--
 .../cassandra/concurrent/IExecutorMBean.java       |    4 +
 .../concurrent/JMXEnabledThreadPoolExecutor.java   |   26 +-
 .../JMXEnabledThreadPoolExecutorMBean.java         |    4 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   97 ++---
 .../cassandra/db/ColumnFamilyStoreMBean.java       |   76 ++++
 src/java/org/apache/cassandra/db/DataTracker.java  |  154 +------
 .../apache/cassandra/db/commitlog/CommitLog.java   |   14 +-
 .../cassandra/db/commitlog/CommitLogMBean.java     |    8 +-
 .../cassandra/db/compaction/CompactionManager.java |   96 ++---
 .../db/compaction/CompactionManagerMBean.java      |    8 +
 .../org/apache/cassandra/metrics/CacheMetrics.java |  108 +++++
 .../cassandra/metrics/ClientRequestMetrics.java    |   32 ++-
 .../cassandra/metrics/ColumnFamilyMetrics.java     |  341 +++++++++++++++
 .../apache/cassandra/metrics/CommitLogMetrics.java |   66 +++
 .../cassandra/metrics/CompactionMetrics.java       |  107 +++++
 .../cassandra/metrics/ConnectionMetrics.java       |  132 ++++++
 .../cassandra/metrics/DroppedMessageMetrics.java   |   54 +++
 .../apache/cassandra/metrics/LatencyMetrics.java   |  142 ++++++
 .../cassandra/metrics/MetricNameFactory.java       |   31 ++
 .../apache/cassandra/metrics/StorageMetrics.java   |   30 ++
 .../apache/cassandra/metrics/StreamingMetrics.java |   60 +++
 .../cassandra/metrics/ThreadPoolMetrics.java       |  116 +++++
 .../org/apache/cassandra/net/MessagingService.java |  101 ++---
 .../cassandra/net/OutboundTcpConnectionPool.java   |   23 +
 .../apache/cassandra/scheduler/WeightedQueue.java  |   18 +-
 .../cassandra/scheduler/WeightedQueueMBean.java    |    2 +
 .../org/apache/cassandra/service/CacheService.java |   33 +-
 .../cassandra/service/CacheServiceMBean.java       |   31 ++
 .../org/apache/cassandra/service/StorageProxy.java |   50 ++-
 .../cassandra/service/StorageProxyMBean.java       |   30 ++
 .../apache/cassandra/service/StorageService.java   |   10 +-
 .../cassandra/service/StorageServiceMBean.java     |    2 +
 .../apache/cassandra/streaming/FileStreamTask.java |   13 +-
 .../cassandra/streaming/IncomingStreamReader.java  |    7 +
 .../compress/CompressedFileStreamTask.java         |    9 +-
 38 files changed, 1628 insertions(+), 461 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 271f5b7..9d09d57 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-dev
+ * new metrics (CASSANDRA-4009)
  * debug tracing (CASSANDRA-1123)
  * parallelize row cache loading (CASSANDRA-4282)
  * Make compaction, flush JBOD-aware (CASSANDRA-4292)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 2587619..d7c1cf7 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -59,7 +59,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
 
     public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType cacheType, CacheSerializer<K, V> cacheloader)
     {
-        super(cache);
+        super(cacheType.toString(), cache);
         this.cacheType = cacheType;
         this.cacheLoader = cacheloader;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/cache/InstrumentingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/InstrumentingCache.java b/src/java/org/apache/cassandra/cache/InstrumentingCache.java
index fda1c6b..9655e8d 100644
--- a/src/java/org/apache/cassandra/cache/InstrumentingCache.java
+++ b/src/java/org/apache/cassandra/cache/InstrumentingCache.java
@@ -18,23 +18,25 @@
 package org.apache.cassandra.cache;
 
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.metrics.CacheMetrics;
 
 /**
  * Wraps an ICache in requests + hits tracking.
  */
 public class InstrumentingCache<K, V>
 {
-    private final AtomicLong requests = new AtomicLong(0);
-    private final AtomicLong hits = new AtomicLong(0);
-    private final AtomicLong lastRequests = new AtomicLong(0);
-    private final AtomicLong lastHits = new AtomicLong(0);
     private volatile boolean capacitySetManually;
     private final ICache<K, V> map;
+    private final String type;
+
+    private CacheMetrics metrics;
 
-    public InstrumentingCache(ICache<K, V> map)
+    public InstrumentingCache(String type, ICache<K, V> map)
     {
         this.map = map;
+        this.type = type;
+        this.metrics = new CacheMetrics(type, map);
     }
 
     public void put(K key, V value)
@@ -55,9 +57,9 @@ public class InstrumentingCache<K, V>
     public V get(K key)
     {
         V v = map.get(key);
-        requests.incrementAndGet();
+        metrics.requests.mark();
         if (v != null)
-            hits.incrementAndGet();
+            metrics.hits.mark();
         return v;
     }
 
@@ -102,36 +104,10 @@ public class InstrumentingCache<K, V>
         return map.weightedSize();
     }
 
-    public long getHits()
-    {
-        return hits.get();
-    }
-
-    public long getRequests()
-    {
-        return requests.get();
-    }
-
-    public double getRecentHitRate()
-    {
-        long r = requests.get();
-        long h = hits.get();
-        try
-        {
-            return ((double)(h - lastHits.get())) / (r - lastRequests.get());
-        }
-        finally
-        {
-            lastRequests.set(r);
-            lastHits.set(h);
-        }
-    }
-
     public void clear()
     {
         map.clear();
-        requests.set(0);
-        hits.set(0);
+        metrics = new CacheMetrics(type, map);
     }
 
     public Set<K> getKeySet()
@@ -153,4 +129,9 @@ public class InstrumentingCache<K, V>
     {
         return map.isPutCopying();
     }
+
+    public CacheMetrics getMetrics()
+    {
+        return metrics;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/concurrent/IExecutorMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/IExecutorMBean.java b/src/java/org/apache/cassandra/concurrent/IExecutorMBean.java
index 29292a7..f085b9c 100644
--- a/src/java/org/apache/cassandra/concurrent/IExecutorMBean.java
+++ b/src/java/org/apache/cassandra/concurrent/IExecutorMBean.java
@@ -17,6 +17,10 @@
  */
 package org.apache.cassandra.concurrent;
 
+/**
+ * @see org.apache.cassandra.metrics.ThreadPoolMetrics
+ */
+@Deprecated
 public interface IExecutorMBean
 {
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
index f75a04b..52f9453 100644
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
@@ -19,13 +19,14 @@ package org.apache.cassandra.concurrent;
 
 import java.lang.management.ManagementFactory;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.cassandra.metrics.ThreadPoolMetrics;
+
 /**
  * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
  * for the <i>afterExecute()</i> found in the <i>ThreadPoolExecutor</i> class to log any unexpected
@@ -35,9 +36,7 @@ import javax.management.ObjectName;
 public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor implements JMXEnabledThreadPoolExecutorMBean
 {
     private final String mbeanName;
-
-    private final AtomicInteger totalBlocked = new AtomicInteger(0);
-    private final AtomicInteger currentBlocked = new AtomicInteger(0);
+    private final ThreadPoolMetrics metrics;
 
     public JMXEnabledThreadPoolExecutor(String threadPoolName)
     {
@@ -75,8 +74,11 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
         super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory);
         super.prestartAllCoreThreads();
 
+        metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id);
+
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id;
+
         try
         {
             mbs.registerMBean(this, new ObjectName(mbeanName));
@@ -102,6 +104,9 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
         {
             throw new RuntimeException(e);
         }
+
+        // release metrics
+        metrics.release();
     }
 
     @Override
@@ -146,31 +151,30 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
 
     public int getTotalBlockedTasks()
     {
-        return totalBlocked.get();
+        return (int) metrics.totalBlocked.count();
     }
 
     public int getCurrentlyBlockedTasks()
     {
-        return currentBlocked.get();
+        return (int) metrics.currentBlocked.count();
     }
 
     @Override
     protected void onInitialRejection(Runnable task)
     {
-        totalBlocked.incrementAndGet();
-        currentBlocked.incrementAndGet();
+        metrics.totalBlocked.inc();
+        metrics.currentBlocked.inc();
     }
 
     @Override
     protected void onFinalAccept(Runnable task)
     {
-        currentBlocked.decrementAndGet();
+        metrics.currentBlocked.dec();
     }
 
     @Override
     protected void onFinalRejection(Runnable task)
     {
-        currentBlocked.decrementAndGet();
+        metrics.currentBlocked.dec();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
index fbed2c4..48136bc 100644
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
@@ -17,6 +17,10 @@
  */
 package org.apache.cassandra.concurrent;
 
+/**
+ * @see org.apache.cassandra.metrics.ThreadPoolMetrics
+ */
+@Deprecated
 public interface JMXEnabledThreadPoolExecutorMBean extends IExecutorMBean
 {
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ef0e55d..464bac6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexExpression;
@@ -112,20 +113,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /* Memtables and SSTables on disk for this column family */
     private final DataTracker data;
 
-    private volatile int memtableSwitchCount = 0;
-
     /* This is used to generate the next index for a SSTable */
     private final AtomicInteger fileIndexGenerator = new AtomicInteger(0);
 
     public final SecondaryIndexManager indexManager;
 
-    private final LatencyTracker readStats = new LatencyTracker();
-    private final LatencyTracker writeStats = new LatencyTracker();
-
-    // counts of sstables accessed by reads
-    private final EstimatedHistogram recentSSTablesPerRead = new EstimatedHistogram(35);
-    private final EstimatedHistogram sstablesPerRead = new EstimatedHistogram(35);
-
     private static final int INTERN_CUTOFF = 256;
     public final ConcurrentMap<ByteBuffer, ByteBuffer> internedNames = new NonBlockingHashMap<ByteBuffer, ByteBuffer>();
 
@@ -141,6 +133,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /** ops count last time we computed liveRatio */
     private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
 
+    public final ColumnFamilyMetrics metric;
+
     public void reload()
     {
         // metadata object has been mutated directly. make all the members jibe with new settings.
@@ -216,6 +210,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         this.partitioner = partitioner;
         this.directories = directories;
         this.indexManager = new SecondaryIndexManager(this);
+        this.metric = new ColumnFamilyMetrics(this);
         fileIndexGenerator.set(generation);
 
         Caching caching = metadata.getCaching();
@@ -308,21 +303,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         ObjectName nameObj = new ObjectName(mbeanName);
         if (mbs.isRegistered(nameObj))
             mbs.unregisterMBean(nameObj);
+
+        // unregister metrics
+        metric.release();
     }
 
     public long getMinRowSize()
     {
-        return data.getMinRowSize();
+        return metric.minRowSize.value();
     }
 
     public long getMaxRowSize()
     {
-        return data.getMaxRowSize();
+        return metric.maxRowSize.value();
     }
 
     public long getMeanRowSize()
     {
-        return data.getMeanRowSize();
+        return metric.meanRowSize.value();
     }
 
     public int getMeanColumns()
@@ -615,9 +613,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 memtable.flushAndSignal(latch, flushWriter, ctx);
             }
 
-            if (memtableSwitchCount == Integer.MAX_VALUE)
-                memtableSwitchCount = 0;
-            memtableSwitchCount++;
+            if (metric.memtableSwitchCount.count() == Long.MAX_VALUE)
+                metric.memtableSwitchCount.clear();
+            metric.memtableSwitchCount.inc();
 
             // when all the memtables have been written, including for indexes, mark the flush in the commitlog header.
             // a second executor makes sure the onMemtableFlushes get called in the right order,
@@ -721,13 +719,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Memtable mt = getMemtableThreadSafe();
         mt.put(key, columnFamily);
         updateRowCache(key, columnFamily);
-        writeStats.addNano(System.nanoTime() - start);
+        metric.writeLatency.addNano(System.nanoTime() - start);
 
         // recompute liveRatio, if we have doubled the number of ops since last calculated
         while (true)
         {
             long last = liveRatioComputedAt.get();
-            long operations = writeStats.getOpCount();
+            long operations = metric.writeLatency.latency.count();
             if (operations < 2 * last)
                 break;
             if (liveRatioComputedAt.compareAndSet(last, operations))
@@ -955,12 +953,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public long getMemtableColumnsCount()
     {
-        return getMemtableThreadSafe().getOperations();
+        return metric.memtableColumnsCount.value();
     }
 
     public long getMemtableDataSize()
     {
-        return getMemtableThreadSafe().getLiveSize();
+        return metric.memtableDataSize.value();
     }
 
     public long getTotalMemtableLiveSize()
@@ -970,7 +968,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public int getMemtableSwitchCount()
     {
-        return memtableSwitchCount;
+        return (int) metric.memtableSwitchCount.count();
     }
 
     /**
@@ -1008,68 +1006,67 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public long[] getRecentSSTablesPerReadHistogram()
     {
-        return recentSSTablesPerRead.getBuckets(true);
+        return metric.recentSSTablesPerRead.getBuckets(true);
     }
 
     public long[] getSSTablesPerReadHistogram()
     {
-        return sstablesPerRead.getBuckets(false);
+        return metric.sstablesPerRead.getBuckets(false);
     }
 
     public long getReadCount()
     {
-        return readStats.getOpCount();
+        return metric.readLatency.latency.count();
     }
 
     public double getRecentReadLatencyMicros()
     {
-        return readStats.getRecentLatencyMicros();
+        return metric.readLatency.getRecentLatency();
     }
 
     public long[] getLifetimeReadLatencyHistogramMicros()
     {
-        return readStats.getTotalLatencyHistogramMicros();
+        return metric.readLatency.totalLatencyHistogram.getBuckets(false);
     }
 
     public long[] getRecentReadLatencyHistogramMicros()
     {
-        return readStats.getRecentLatencyHistogramMicros();
+        return metric.readLatency.recentLatencyHistogram.getBuckets(true);
     }
 
     public long getTotalReadLatencyMicros()
     {
-        return readStats.getTotalLatencyMicros();
+        return metric.readLatency.totalLatency.count();
     }
 
-// TODO this actually isn't a good meature of pending tasks
     public int getPendingTasks()
     {
-        return Table.switchLock.getQueueLength();
+        return metric.pendingTasks.value();
     }
 
     public long getWriteCount()
     {
-        return writeStats.getOpCount();
+        return metric.writeLatency.latency.count();
     }
 
     public long getTotalWriteLatencyMicros()
     {
-        return writeStats.getTotalLatencyMicros();
+        return metric.writeLatency.totalLatency.count();
     }
 
     public double getRecentWriteLatencyMicros()
     {
-        return writeStats.getRecentLatencyMicros();
+        return metric.writeLatency.getRecentLatency();
     }
 
     public long[] getLifetimeWriteLatencyHistogramMicros()
     {
-        return writeStats.getTotalLatencyHistogramMicros();
+        return metric.writeLatency.totalLatencyHistogram.getBuckets(false);
     }
 
     public long[] getRecentWriteLatencyHistogramMicros()
     {
-        return writeStats.getRecentLatencyHistogramMicros();
+        return metric.writeLatency.recentLatencyHistogram.getBuckets(true);
     }
 
     public ColumnFamily getColumnFamily(DecoratedKey key, QueryPath path, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
@@ -1189,7 +1186,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
         finally
         {
-            readStats.addNano(System.nanoTime() - start);
+            metric.readLatency.addNano(System.nanoTime() - start);
         }
 
         logger.debug("Read {} columns", result == null ? 0 : result.getColumnCount());
@@ -1317,8 +1314,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         CollationController controller = new CollationController(this, forCache, filter, gcBefore);
         ColumnFamily columns = controller.getTopLevelColumns();
-        recentSSTablesPerRead.add(controller.getSstablesIterated());
-        sstablesPerRead.add(controller.getSstablesIterated());
+        metric.updateSSTableIterated(controller.getSstablesIterated());
         return columns;
     }
 
@@ -1548,22 +1544,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public boolean hasUnreclaimedSpace()
     {
-        return data.getLiveSize() < data.getTotalSize();
+        return getLiveDiskSpaceUsed() < getTotalDiskSpaceUsed();
     }
 
     public long getTotalDiskSpaceUsed()
     {
-        return data.getTotalSize();
+        return metric.totalDiskSpaceUsed.count();
     }
 
     public long getLiveDiskSpaceUsed()
     {
-        return data.getLiveSize();
+        return metric.liveDiskSpaceUsed.count();
     }
 
     public int getLiveSSTableCount()
     {
-        return data.getSSTables().size();
+        return metric.liveSSTableCount.value();
     }
 
     /**
@@ -1777,30 +1773,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public long getBloomFilterFalsePositives()
     {
-        return data.getBloomFilterFalsePositives();
+        return metric.bloomFilterFalsePositives.value();
     }
 
     public long getRecentBloomFilterFalsePositives()
     {
-        return data.getRecentBloomFilterFalsePositives();
+        return metric.recentBloomFilterFalsePositives.value();
     }
 
     public double getBloomFilterFalseRatio()
     {
-        return data.getBloomFilterFalseRatio();
+        return metric.bloomFilterFalseRatio.value();
     }
 
     public double getRecentBloomFilterFalseRatio()
     {
-        return data.getRecentBloomFilterFalseRatio();
+        return metric.recentBloomFilterFalseRatio.value();
     }
 
     public long getBloomFilterDiskSpaceUsed()
     {
-        long total = 0;
-        for (SSTableReader sst : getSSTables())
-            total += sst.getBloomFilterSerializedSize();
-        return total;
+        return metric.bloomFilterDiskSpaceUsed.value();
     }
 
     @Override
@@ -1887,17 +1880,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public long[] getEstimatedRowSizeHistogram()
     {
-        return data.getEstimatedRowSizeHistogram();
+        return metric.estimatedRowSizeHistogram.value();
     }
 
     public long[] getEstimatedColumnCountHistogram()
     {
-        return data.getEstimatedColumnCountHistogram();
+        return metric.estimatedColumnCountHistogram.value();
     }
 
     public double getCompressionRatio()
     {
-        return data.getCompressionRatio();
+        return metric.compressionRatio.value();
     }
 
     /** true if this CFS contains secondary index data */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 20baae9..8e7fd9d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -37,103 +37,141 @@ public interface ColumnFamilyStoreMBean
      * Returns the total amount of data stored in the memtable, including
      * column related overhead.
      *
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#memtableDataSize
      * @return The size in bytes.
      */
+    @Deprecated
     public long getMemtableDataSize();
 
     /**
      * Returns the total number of columns present in the memtable.
      *
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#memtableColumnsCount
      * @return The number of columns.
      */
+    @Deprecated
     public long getMemtableColumnsCount();
 
     /**
      * Returns the number of times that a flush has resulted in the
      * memtable being switched out.
      *
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#memtableSwitchCount
      * @return the number of memtable switches
      */
+    @Deprecated
     public int getMemtableSwitchCount();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#recentSSTablesPerReadHistogram
      * @return a histogram of the number of sstable data files accessed per read: reading this property resets it
      */
+    @Deprecated
     public long[] getRecentSSTablesPerReadHistogram();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#sstablesPerReadHistogram
      * @return a histogram of the number of sstable data files accessed per read
      */
+    @Deprecated
     public long[] getSSTablesPerReadHistogram();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#readLatency
      * @return the number of read operations on this column family
      */
+    @Deprecated
     public long getReadCount();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#readLatency
      * @return total read latency (divide by getReadCount() for average)
      */
+    @Deprecated
     public long getTotalReadLatencyMicros();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#readLatency
      * @return an array representing the latency histogram
      */
+    @Deprecated
     public long[] getLifetimeReadLatencyHistogramMicros();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#readLatency
      * @return an array representing the latency histogram
      */
+    @Deprecated
     public long[] getRecentReadLatencyHistogramMicros();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#readLatency
      * @return average latency per read operation since the last call
      */
+    @Deprecated
     public double getRecentReadLatencyMicros();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#writeLatency
      * @return the number of write operations on this column family
      */
+    @Deprecated
     public long getWriteCount();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#writeLatency
      * @return total write latency (divide by getReadCount() for average)
      */
+    @Deprecated
     public long getTotalWriteLatencyMicros();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#writeLatency
      * @return an array representing the latency histogram
      */
+    @Deprecated
     public long[] getLifetimeWriteLatencyHistogramMicros();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#writeLatency
      * @return an array representing the latency histogram
      */
+    @Deprecated
     public long[] getRecentWriteLatencyHistogramMicros();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#writeLatency
      * @return average latency per write operation since the last call
      */
+    @Deprecated
     public double getRecentWriteLatencyMicros();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#pendingTasks
      * @return the estimated number of tasks pending for this column family
      */
+    @Deprecated
     public int getPendingTasks();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#liveSSTableCount
      * @return the number of SSTables on disk for this CF
      */
+    @Deprecated
     public int getLiveSSTableCount();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#liveDiskSpaceUsed
      * @return disk space used by SSTables belonging to this CF
      */
+    @Deprecated
     public long getLiveDiskSpaceUsed();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#totalDiskSpaceUsed
      * @return total disk space used by SSTables belonging to this CF, including obsolete ones waiting to be GC'd
      */
+    @Deprecated
     public long getTotalDiskSpaceUsed();
 
     /**
@@ -142,28 +180,54 @@ public interface ColumnFamilyStoreMBean
     public void forceMajorCompaction() throws ExecutionException, InterruptedException;
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#minRowSize
      * @return the size of the smallest compacted row
      */
+    @Deprecated
     public long getMinRowSize();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#maxRowSize
      * @return the size of the largest compacted row
      */
+    @Deprecated
     public long getMaxRowSize();
 
     /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#meanRowSize
      * @return the size of the smallest compacted row
      */
+    @Deprecated
     public long getMeanRowSize();
 
+    /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#bloomFilterFalsePositives
+     */
+    @Deprecated
     public long getBloomFilterFalsePositives();
 
+    /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#recentBloomFilterFalsePositives
+     */
+    @Deprecated
     public long getRecentBloomFilterFalsePositives();
 
+    /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#bloomFilterFalseRatio
+     */
+    @Deprecated
     public double getBloomFilterFalseRatio();
 
+    /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#recentBloomFilterFalseRatio
+     */
+    @Deprecated
     public double getRecentBloomFilterFalseRatio();
 
+    /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#bloomFilterDiskSpaceUsed
+     */
+    @Deprecated
     public long getBloomFilterDiskSpaceUsed();
 
     /**
@@ -220,8 +284,20 @@ public interface ColumnFamilyStoreMBean
 
     public long estimateKeys();
 
+    /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#estimatedRowSizeHistogram
+     */
+    @Deprecated
     public long[] getEstimatedRowSizeHistogram();
+    /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#estimatedColumnCountHistogram
+     */
+    @Deprecated
     public long[] getEstimatedColumnCountHistogram();
+    /**
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#compressionRatio
+     */
+    @Deprecated
     public double getCompressionRatio();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 8233fbe..1e7b1bf 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
 import java.io.File;
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.*;
@@ -31,6 +30,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
 import org.apache.cassandra.notifications.SSTableAddedNotification;
@@ -47,10 +47,6 @@ public class DataTracker
     public final ColumnFamilyStore cfstore;
     private final AtomicReference<View> view;
 
-    // On disk live and total size
-    private final AtomicLong liveSize = new AtomicLong();
-    private final AtomicLong totalSize = new AtomicLong();
-
     public DataTracker(ColumnFamilyStore cfstore)
     {
         this.cfstore = cfstore;
@@ -360,8 +356,9 @@ public class DataTracker
                 logger.debug(String.format("adding %s to list of files tracked for %s.%s",
                             sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));
             long size = sstable.bytesOnDisk();
-            liveSize.addAndGet(size);
-            totalSize.addAndGet(size);
+            StorageMetrics.load.inc(size);
+            cfstore.metric.liveDiskSpaceUsed.inc(size);
+            cfstore.metric.totalDiskSpaceUsed.inc(size);
             sstable.setTrackedBy(this);
         }
     }
@@ -373,26 +370,18 @@ public class DataTracker
             if (logger.isDebugEnabled())
                 logger.debug(String.format("removing %s from list of files tracked for %s.%s",
                             sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));
-            liveSize.addAndGet(-sstable.bytesOnDisk());
+            long size = sstable.bytesOnDisk();
+            StorageMetrics.load.dec(size);
+            cfstore.metric.liveDiskSpaceUsed.dec(size);
             boolean firstToCompact = sstable.markCompacted();
             assert firstToCompact : sstable + " was already marked compacted";
             sstable.releaseReference();
         }
     }
 
-    public long getLiveSize()
-    {
-        return liveSize.get();
-    }
-
-    public long getTotalSize()
-    {
-        return totalSize.get();
-    }
-
     public void spaceReclaimed(long size)
     {
-        totalSize.addAndGet(-size);
+        cfstore.metric.totalDiskSpaceUsed.dec(size);
     }
 
     public long estimatedKeys()
@@ -405,85 +394,6 @@ public class DataTracker
         return n;
     }
 
-    public long[] getEstimatedRowSizeHistogram()
-    {
-        long[] histogram = new long[90];
-
-        for (SSTableReader sstable : getSSTables())
-        {
-            long[] rowSize = sstable.getEstimatedRowSize().getBuckets(false);
-
-            for (int i = 0; i < histogram.length; i++)
-                histogram[i] += rowSize[i];
-        }
-
-        return histogram;
-    }
-
-    public long[] getEstimatedColumnCountHistogram()
-    {
-        long[] histogram = new long[90];
-
-        for (SSTableReader sstable : getSSTables())
-        {
-            long[] columnSize = sstable.getEstimatedColumnCount().getBuckets(false);
-
-            for (int i = 0; i < histogram.length; i++)
-                histogram[i] += columnSize[i];
-        }
-
-        return histogram;
-    }
-
-    public double getCompressionRatio()
-    {
-        double sum = 0;
-        int total = 0;
-        for (SSTableReader sstable : getSSTables())
-        {
-            if (sstable.getCompressionRatio() != Double.MIN_VALUE)
-            {
-                sum += sstable.getCompressionRatio();
-                total++;
-            }
-        }
-        return total != 0 ? (double)sum/total: 0;
-    }
-
-    public long getMinRowSize()
-    {
-        long min = 0;
-        for (SSTableReader sstable : getSSTables())
-        {
-            if (min == 0 || sstable.getEstimatedRowSize().min() < min)
-                min = sstable.getEstimatedRowSize().min();
-        }
-        return min;
-    }
-
-    public long getMaxRowSize()
-    {
-        long max = 0;
-        for (SSTableReader sstable : getSSTables())
-        {
-            if (sstable.getEstimatedRowSize().max() > max)
-                max = sstable.getEstimatedRowSize().max();
-        }
-        return max;
-    }
-
-    public long getMeanRowSize()
-    {
-        long sum = 0;
-        long count = 0;
-        for (SSTableReader sstable : getSSTables())
-        {
-            sum += sstable.getEstimatedRowSize().mean();
-            count++;
-        }
-        return count > 0 ? sum / count : 0;
-    }
-
     public int getMeanColumns()
     {
         long sum = 0;
@@ -496,54 +406,6 @@ public class DataTracker
         return count > 0 ? (int) (sum / count) : 0;
     }
 
-    public long getBloomFilterFalsePositives()
-    {
-        long count = 0L;
-        for (SSTableReader sstable: getSSTables())
-        {
-            count += sstable.getBloomFilterFalsePositiveCount();
-        }
-        return count;
-    }
-
-    public long getRecentBloomFilterFalsePositives()
-    {
-        long count = 0L;
-        for (SSTableReader sstable: getSSTables())
-        {
-            count += sstable.getRecentBloomFilterFalsePositiveCount();
-        }
-        return count;
-    }
-
-    public double getBloomFilterFalseRatio()
-    {
-        long falseCount = 0L;
-        long trueCount = 0L;
-        for (SSTableReader sstable: getSSTables())
-        {
-            falseCount += sstable.getBloomFilterFalsePositiveCount();
-            trueCount += sstable.getBloomFilterTruePositiveCount();
-        }
-        if (falseCount == 0L && trueCount == 0L)
-            return 0d;
-        return (double) falseCount / (trueCount + falseCount);
-    }
-
-    public double getRecentBloomFilterFalseRatio()
-    {
-        long falseCount = 0L;
-        long trueCount = 0L;
-        for (SSTableReader sstable: getSSTables())
-        {
-            falseCount += sstable.getRecentBloomFilterFalsePositiveCount();
-            trueCount += sstable.getRecentBloomFilterTruePositiveCount();
-        }
-        if (falseCount == 0L && trueCount == 0L)
-            return 0d;
-        return (double) falseCount / (trueCount + falseCount);
-    }
-
     public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)
     {
         for (INotificationConsumer subscriber : subscribers)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index e3f3c13..22abcb7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
 
 /*
@@ -58,6 +59,8 @@ public class CommitLog implements CommitLogMBean
 
     public CommitLogSegment activeSegment;
 
+    private final CommitLogMetrics metrics;
+
     private CommitLog()
     {
         DatabaseDescriptor.createAllDirectories();
@@ -78,6 +81,9 @@ public class CommitLog implements CommitLogMBean
         {
             throw new RuntimeException(e);
         }
+
+        // register metrics
+        metrics = new CommitLogMetrics(executor, allocator);
     }
 
     /**
@@ -272,7 +278,7 @@ public class CommitLog implements CommitLogMBean
      */
     public long getCompletedTasks()
     {
-        return executor.getCompletedTasks();
+        return metrics.completedTasks.value();
     }
 
     /**
@@ -280,7 +286,7 @@ public class CommitLog implements CommitLogMBean
      */
     public long getPendingTasks()
     {
-        return executor.getPendingTasks();
+        return metrics.pendingTasks.value();
     }
 
     /**
@@ -288,7 +294,7 @@ public class CommitLog implements CommitLogMBean
      */
     public long getTotalCommitlogSize()
     {
-        return allocator.bytesUsed();
+        return metrics.totalCommitLogSize.value();
     }
 
     /**
@@ -330,7 +336,7 @@ public class CommitLog implements CommitLogMBean
             segmentNames.add(segment.getName());
         return segmentNames;
     }
-    
+
     public List<String> getArchivingSegmentNames()
     {
         return new ArrayList<String>(archiver.archivePending.keySet());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
index 3570dc4..6c0d8d7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
@@ -25,17 +25,23 @@ public interface CommitLogMBean
 {
     /**
      * Get the number of completed tasks
+     * @see org.apache.cassandra.metrics.CommitLogMetrics#completedTasks
      */
+    @Deprecated
     public long getCompletedTasks();
 
     /**
      * Get the number of tasks waiting to be executed
+     * @see org.apache.cassandra.metrics.CommitLogMetrics#pendingTasks
      */
+    @Deprecated
     public long getPendingTasks();
 
     /**
      * Get the current size used by all the commitlog segments.
+     * @see org.apache.cassandra.metrics.CommitLogMetrics#totalCommitLogSize
      */
+    @Deprecated
     public long getTotalCommitlogSize();
 
     /**
@@ -52,4 +58,4 @@ public interface CommitLogMBean
      * @return Files which are pending for archival attempt.  Does NOT include failed archive attempts.
      */
     public List<String> getArchivingSegmentNames();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5526dc1..7474191 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.*;
@@ -99,6 +100,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     private final CompactionExecutor executor = new CompactionExecutor();
     private final CompactionExecutor validationExecutor = new ValidationExecutor();
+    private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor);
 
     /**
      * @return A lock, for which acquisition means no compactions can run.
@@ -148,7 +150,7 @@ public class CompactionManager implements CompactionManagerMBean
 
                     try
                     {
-                        task.execute(executor);
+                        task.execute(metrics);
                     }
                     finally
                     {
@@ -238,7 +240,7 @@ public class CompactionManager implements CompactionManagerMBean
                     CompactionTask task = new CompactionTask(cfs, Collections.singletonList(sstable), NO_GC);
                     task.isUserDefined(true);
                     task.setCompactionType(OperationType.UPGRADE_SSTABLES);
-                    task.execute(executor);
+                    task.execute(metrics);
                 }
             }
         });
@@ -293,7 +295,7 @@ public class CompactionManager implements CompactionManagerMBean
                         compactionLock.writeLock().unlock();
                         try
                         {
-                            task.execute(executor);
+                            task.execute(metrics);
                         }
                         finally
                         {
@@ -390,7 +392,7 @@ public class CompactionManager implements CompactionManagerMBean
                             {
                                 AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
                                 AbstractCompactionTask task = strategy.getUserDefinedTask(toCompact, gcBefore);
-                                task.execute(executor);
+                                task.execute(metrics);
                             }
                             finally
                             {
@@ -488,7 +490,7 @@ public class CompactionManager implements CompactionManagerMBean
         Scrubber scrubber = new Scrubber(cfs, sstable);
 
         CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
-        executor.beginCompaction(scrubInfo);
+        metrics.beginCompaction(scrubInfo);
         try
         {
             scrubber.scrub();
@@ -496,7 +498,7 @@ public class CompactionManager implements CompactionManagerMBean
         finally
         {
             scrubber.close();
-            executor.finishCompaction(scrubInfo);
+            metrics.finishCompaction(scrubInfo);
         }
 
         if (scrubber.getNewInOrderSSTable() != null)
@@ -561,7 +563,7 @@ public class CompactionManager implements CompactionManagerMBean
             List<IColumn> indexedColumnsInRow = null;
 
             CleanupInfo ci = new CleanupInfo(sstable, scanner);
-            executor.beginCompaction(ci);
+            metrics.beginCompaction(ci);
             try
             {
                 while (scanner.hasNext())
@@ -631,7 +633,7 @@ public class CompactionManager implements CompactionManagerMBean
             finally
             {
                 scanner.close();
-                executor.finishCompaction(ci);
+                metrics.finishCompaction(ci);
             }
 
             List<SSTableReader> results = new ArrayList<SSTableReader>(1);
@@ -711,7 +713,7 @@ public class CompactionManager implements CompactionManagerMBean
 
         CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.request.range);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-        validationExecutor.beginCompaction(ci);
+        metrics.beginCompaction(ci);
         try
         {
             Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
@@ -734,7 +736,7 @@ public class CompactionManager implements CompactionManagerMBean
             if (cfs.table.snapshotExists(validator.request.sessionid))
                 cfs.table.clearSnapshot(validator.request.sessionid);
 
-            validationExecutor.finishCompaction(ci);
+            metrics.finishCompaction(ci);
         }
     }
 
@@ -750,14 +752,14 @@ public class CompactionManager implements CompactionManagerMBean
                 compactionLock.readLock().lock();
                 try
                 {
-                    executor.beginCompaction(builder);
+                    metrics.beginCompaction(builder);
                     try
                     {
                         builder.build();
                     }
                     finally
                     {
-                        executor.finishCompaction(builder);
+                        metrics.finishCompaction(builder);
                     }
                 }
                 finally
@@ -790,14 +792,14 @@ public class CompactionManager implements CompactionManagerMBean
                 }
                 try
                 {
-                    executor.beginCompaction(writer);
+                    metrics.beginCompaction(writer);
                     try
                     {
                         writer.saveCache();
                     }
                     finally
                     {
-                        executor.finishCompaction(writer);
+                        metrics.finishCompaction(writer);
                     }
                 }
                 finally
@@ -873,15 +875,11 @@ public class CompactionManager implements CompactionManagerMBean
 
     public int getActiveCompactions()
     {
-        return CompactionExecutor.compactions.size();
+        return CompactionMetrics.getCompactions().size();
     }
 
-    private static class CompactionExecutor extends ThreadPoolExecutor implements CompactionExecutorStatsCollector
+    private static class CompactionExecutor extends ThreadPoolExecutor
     {
-        // a synchronized identity set of running tasks to their compaction info
-        private static final Set<CompactionInfo.Holder> compactions = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<CompactionInfo.Holder, Boolean>()));
-        private volatile long totalBytesCompacted = 0L;
-        private volatile long totalCompactionsCompleted = 0L;
 
         protected CompactionExecutor(int minThreads, int maxThreads, String name, BlockingQueue<Runnable> queue)
         {
@@ -899,40 +897,6 @@ public class CompactionManager implements CompactionManagerMBean
             this(Math.max(1, DatabaseDescriptor.getConcurrentCompactors()), "CompactionExecutor");
         }
 
-        public void beginCompaction(CompactionInfo.Holder ci)
-        {
-            // notify
-            ci.started();
-            compactions.add(ci);
-        }
-
-        public void finishCompaction(CompactionInfo.Holder ci)
-        {
-            // notify
-            ci.finished();
-            compactions.remove(ci);
-            totalBytesCompacted += ci.getCompactionInfo().getTotal();
-            totalCompactionsCompleted += 1;
-        }
-
-        public static List<CompactionInfo.Holder> getCompactions()
-        {
-            return new ArrayList<CompactionInfo.Holder>(compactions);
-        }
-
-        public long getTotalBytesCompacted()
-        {
-            long bytesCompletedInProgress = 0L;
-            for (CompactionInfo.Holder ci : compactions)
-                bytesCompletedInProgress += ci.getCompactionInfo().getCompleted();
-            return bytesCompletedInProgress + totalBytesCompacted;
-        }
-
-        public long getTotalCompactionsCompleted()
-        {
-            return totalCompactionsCompleted;
-        }
-
         // modified from DebuggableThreadPoolExecutor so that CompactionInterruptedExceptions are not logged
         @Override
         public void afterExecute(Runnable r, Throwable t)
@@ -973,7 +937,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     public List<Map<String, String>> getCompactions()
     {
-        List<Holder> compactionHolders = CompactionExecutor.getCompactions();
+        List<Holder> compactionHolders = CompactionMetrics.getCompactions();
         List<Map<String, String>> out = new ArrayList<Map<String, String>>(compactionHolders.size());
         for (CompactionInfo.Holder ci : compactionHolders)
             out.add(ci.getCompactionInfo().asMap());
@@ -982,7 +946,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     public List<String> getCompactionSummary()
     {
-        List<Holder> compactionHolders = CompactionExecutor.getCompactions();
+        List<Holder> compactionHolders = CompactionMetrics.getCompactions();
         List<String> out = new ArrayList<String>(compactionHolders.size());
         for (CompactionInfo.Holder ci : compactionHolders)
             out.add(ci.getCompactionInfo().toString());
@@ -991,30 +955,22 @@ public class CompactionManager implements CompactionManagerMBean
 
     public long getTotalBytesCompacted()
     {
-        return executor.getTotalBytesCompacted() + validationExecutor.getTotalBytesCompacted();
+        return metrics.bytesCompacted.count();
     }
 
     public long getTotalCompactionsCompleted()
     {
-        return executor.getTotalCompactionsCompleted() + validationExecutor.getTotalCompactionsCompleted();
+        return metrics.totalCompactionsCompleted.count();
     }
 
     public int getPendingTasks()
     {
-        int n = 0;
-        for (String tableName : Schema.instance.getTables())
-        {
-            for (ColumnFamilyStore cfs : Table.open(tableName).getColumnFamilyStores())
-            {
-                n += cfs.getCompactionStrategy().getEstimatedRemainingTasks();
-            }
-        }
-        return (int) (executor.getTaskCount() + validationExecutor.getTaskCount() - executor.getCompletedTaskCount() - validationExecutor.getCompletedTaskCount()) + n;
+        return metrics.pendingTasks.value();
     }
 
     public long getCompletedTasks()
     {
-        return executor.getCompletedTaskCount() + validationExecutor.getCompletedTaskCount();
+        return metrics.completedTasks.value();
     }
 
     private static class SimpleFuture implements Future
@@ -1083,7 +1039,7 @@ public class CompactionManager implements CompactionManagerMBean
     public void stopCompaction(String type)
     {
         OperationType operation = OperationType.valueOf(type);
-        for (Holder holder : CompactionExecutor.getCompactions())
+        for (Holder holder : CompactionMetrics.getCompactions())
         {
             if (holder.getCompactionInfo().getTaskType() == operation)
                 holder.stop();
@@ -1100,7 +1056,7 @@ public class CompactionManager implements CompactionManagerMBean
     {
         assert columnFamilies != null;
 
-        for (Holder compactionHolder : CompactionExecutor.getCompactions())
+        for (Holder compactionHolder : CompactionMetrics.getCompactions())
         {
             CompactionInfo info = compactionHolder.getCompactionInfo();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index 14fad26..5f1a156 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -29,23 +29,31 @@ public interface CompactionManagerMBean
     public List<String> getCompactionSummary();
 
     /**
+     * @see org.apache.cassandra.metrics.CompactionMetrics#pendingTasks
      * @return estimated number of compactions remaining to perform
      */
+    @Deprecated
     public int getPendingTasks();
 
     /**
+     * @see org.apache.cassandra.metrics.CompactionMetrics#completedTasks
      * @return number of completed compactions since server [re]start
      */
+    @Deprecated
     public long getCompletedTasks();
 
     /**
+     * @see org.apache.cassandra.metrics.CompactionMetrics#totalBytesCompacted
      * @return total number of bytes compacted since server [re]start
      */
+    @Deprecated
     public long getTotalBytesCompacted();
 
     /**
+     * @see org.apache.cassandra.metrics.CompactionMetrics#totalCompactionsCompleted
      * @return total number of compactions since server [re]start
      */
+    @Deprecated
     public long getTotalCompactionsCompleted();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/CacheMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CacheMetrics.java b/src/java/org/apache/cassandra/metrics/CacheMetrics.java
new file mode 100644
index 0000000..dee9319
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/CacheMetrics.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.util.RatioGauge;
+
+import org.apache.cassandra.cache.ICache;
+
+/**
+ * Metrics for {@code ICache}.
+ */
+public class CacheMetrics
+{
+    public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+    public static final String TYPE_NAME = "Cache";
+
+    /** Cache capacity in bytes */
+    public final Gauge<Long> capacityInBytes;
+    /** Total number of cache hits */
+    public final Meter hits;
+    /** Total number of cache requests */
+    public final Meter requests;
+    /** cache hit rate */
+    public final Gauge<Double> hitRate;
+    /** Total size of cache */
+    public final Gauge<Long> size;
+
+    private final AtomicLong lastRequests = new AtomicLong(0);
+    private final AtomicLong lastHits = new AtomicLong(0);
+
+    /**
+     * Create metrics for given cache.
+     *
+     * @param type Type of Cache to identify metrics.
+     * @param cache Cache to measure metrics
+     */
+    public CacheMetrics(String type, final ICache cache)
+    {
+        capacityInBytes = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CapacityInBytes", type), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return cache.capacity();
+            }
+        });
+        hits = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "Hits", type), "hits", TimeUnit.SECONDS);
+        requests = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "Requests", type), "requests", TimeUnit.SECONDS);
+        hitRate = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "HitRate", type), new RatioGauge()
+        {
+            protected double getNumerator()
+            {
+                return hits.count();
+            }
+
+            protected double getDenominator()
+            {
+                return requests.count();
+            }
+        });
+        size = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "Size", type), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return cache.weightedSize();
+            }
+        });
+    }
+
+    // for backward compatibility
+    @Deprecated
+    public double getRecentHitRate()
+    {
+        long r = requests.count();
+        long h = hits.count();
+        try
+        {
+            return ((double)(h - lastHits.get())) / (r - lastRequests.get());
+        }
+        finally
+        {
+            lastRequests.set(r);
+            lastHits.set(h);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
index 02ca0fe..1e9a3b9 100644
--- a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
@@ -20,13 +20,35 @@
  */
 package org.apache.cassandra.metrics;
 
+import java.util.concurrent.TimeUnit;
+
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
 
-public class ClientRequestMetrics
+public class ClientRequestMetrics extends LatencyMetrics
 {
-    public static final Counter readTimeouts = Metrics.newCounter(ClientRequestMetrics.class, "ReadTimeouts");
-    public static final Counter writeTimeouts = Metrics.newCounter(ClientRequestMetrics.class, "WriteTimeouts");
-    public static final Counter readUnavailables = Metrics.newCounter(ClientRequestMetrics.class, "ReadUnavailables");
-    public static final Counter writeUnavailables = Metrics.newCounter(ClientRequestMetrics.class, "WriteUnavailables");
+    @Deprecated public static final Counter readTimeouts = Metrics.newCounter(ClientRequestMetrics.class, "ReadTimeouts");
+    @Deprecated public static final Counter writeTimeouts = Metrics.newCounter(ClientRequestMetrics.class, "WriteTimeouts");
+    @Deprecated public static final Counter readUnavailables = Metrics.newCounter(ClientRequestMetrics.class, "ReadUnavailables");
+    @Deprecated public static final Counter writeUnavailables = Metrics.newCounter(ClientRequestMetrics.class, "WriteUnavailables");
+
+    public final Meter timeouts;
+    public final Meter unavailables;
+
+    public ClientRequestMetrics(String scope)
+    {
+        super("org.apache.cassandra.metrics", "ClientRequest", scope);
+
+        timeouts = Metrics.newMeter(factory.createMetricName("Timeouts"), "timeouts", TimeUnit.SECONDS);
+        unavailables = Metrics.newMeter(factory.createMetricName("Unavailables"), "unavailables", TimeUnit.SECONDS);
+    }
+
+    public void release()
+    {
+        super.release();
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("Timeouts"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("Unavailables"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
new file mode 100644
index 0000000..e206daf
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+/**
+ * Metrics for {@link ColumnFamilyStore}.
+ */
+public class ColumnFamilyMetrics
+{
+    /** Total amount of data stored in the memtable, including column related overhead. */
+    public final Gauge<Long> memtableDataSize;
+    /** Total number of columns present in the memtable. */
+    public final Gauge<Long> memtableColumnsCount;
+    /** Number of times flush has resulted in the memtable being switched out. */
+    public final Counter memtableSwitchCount;
+    /** Current compression ratio for all SSTables */
+    public final Gauge<Double> compressionRatio;
+    /** Histogram of estimated row size (in bytes). */
+    public final Gauge<long[]> estimatedRowSizeHistogram;
+    /** Histogram of estimated number of columns. */
+    public final Gauge<long[]> estimatedColumnCountHistogram;
+    /** Histogram of the number of sstable data files accessed per read */
+    public final Histogram sstablesPerReadHistogram;
+    /** Read metrics */
+    public final LatencyMetrics readLatency;
+    /** Write metrics */
+    public final LatencyMetrics writeLatency;
+    /** Estimated number of tasks pending for this column family */
+    public final Gauge<Integer> pendingTasks;
+    /** Number of SSTables on disk for this CF */
+    public final Gauge<Integer> liveSSTableCount;
+    /** Disk space used by SSTables belonging to this CF */
+    public final Counter liveDiskSpaceUsed;
+    /** Total disk space used by SSTables belonging to this CF, including obsolete ones waiting to be GC'd */
+    public final Counter totalDiskSpaceUsed;
+    /** Size of the smallest compacted row */
+    public final Gauge<Long> minRowSize;
+    /** Size of the largest compacted row */
+    public final Gauge<Long> maxRowSize;
+    /** Size of the smallest compacted row */
+    public final Gauge<Long> meanRowSize;
+    /** Number of false positives in bloom filter */
+    public final Gauge<Long> bloomFilterFalsePositives;
+    /** Number of false positives in bloom filter from last read */
+    public final Gauge<Long> recentBloomFilterFalsePositives;
+    /** False positive ratio of bloom filter */
+    public final Gauge<Double> bloomFilterFalseRatio;
+    /** False positive ratio of bloom filter from last read */
+    public final Gauge<Double> recentBloomFilterFalseRatio;
+    /** Disk space used by bloom filter */
+    public final Gauge<Long> bloomFilterDiskSpaceUsed;
+
+    private final MetricNameFactory factory;
+
+    // for backward compatibility
+    @Deprecated public final EstimatedHistogram sstablesPerRead = new EstimatedHistogram(35);
+    @Deprecated public final EstimatedHistogram recentSSTablesPerRead = new EstimatedHistogram(35);
+
+    /**
+     * Creates metrics for given {@link ColumnFamilyStore}.
+     *
+     * @param cfs ColumnFamilyStore to measure metrics
+     */
+    public ColumnFamilyMetrics(final ColumnFamilyStore cfs)
+    {
+        factory = new ColumnFamilyMetricNameFactory(cfs);
+
+        memtableColumnsCount = Metrics.newGauge(factory.createMetricName("MemtableColumnsCount"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return cfs.getDataTracker().getMemtable().getOperations();
+            }
+        });
+        memtableDataSize = Metrics.newGauge(factory.createMetricName("MemtableDataSize"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return cfs.getDataTracker().getMemtable().getLiveSize();
+            }
+        });
+        memtableSwitchCount = Metrics.newCounter(factory.createMetricName("MemtableSwitchCount"));
+        estimatedRowSizeHistogram = Metrics.newGauge(factory.createMetricName("EstimatedRowSizeHistogram"), new Gauge<long[]>()
+        {
+            public long[] value()
+            {
+                long[] histogram = new long[90];
+                for (SSTableReader sstable : cfs.getSSTables())
+                {
+                    long[] rowSize = sstable.getEstimatedRowSize().getBuckets(false);
+                    for (int i = 0; i < histogram.length; i++)
+                        histogram[i] += rowSize[i];
+                }
+                return histogram;
+            }
+        });
+        estimatedColumnCountHistogram = Metrics.newGauge(factory.createMetricName("EstimatedColumnCountHistogram"), new Gauge<long[]>()
+        {
+            public long[] value()
+            {
+                long[] histogram = new long[90];
+                for (SSTableReader sstable : cfs.getSSTables())
+                {
+                    long[] columnSize = sstable.getEstimatedColumnCount().getBuckets(false);
+                    for (int i = 0; i < histogram.length; i++)
+                        histogram[i] += columnSize[i];
+                }
+                return histogram;
+            }
+        });
+        sstablesPerReadHistogram = Metrics.newHistogram(factory.createMetricName("SSTablesPerReadHistogram"));
+        compressionRatio = Metrics.newGauge(factory.createMetricName("CompressionRatio"), new Gauge<Double>()
+        {
+            public Double value()
+            {
+                double sum = 0;
+                int total = 0;
+                for (SSTableReader sstable : cfs.getSSTables())
+                {
+                    if (sstable.getCompressionRatio() != Double.MIN_VALUE)
+                    {
+                        sum += sstable.getCompressionRatio();
+                        total++;
+                    }
+                }
+                return total != 0 ? (double)sum/total: 0;
+            }
+        });
+        readLatency = new LatencyMetrics(factory, "Read");
+        writeLatency = new LatencyMetrics(factory, "Write");
+        pendingTasks = Metrics.newGauge(factory.createMetricName("PendingTasks"), new Gauge<Integer>()
+        {
+            public Integer value()
+            {
+                // TODO this actually isn't a good measure of pending tasks
+                return Table.switchLock.getQueueLength();
+            }
+        });
+        liveSSTableCount = Metrics.newGauge(factory.createMetricName("LiveSSTableCount"), new Gauge<Integer>()
+        {
+            public Integer value()
+            {
+                return cfs.getDataTracker().getSSTables().size();
+            }
+        });
+        liveDiskSpaceUsed = Metrics.newCounter(factory.createMetricName("LiveDiskSpaceUsed"));
+        totalDiskSpaceUsed = Metrics.newCounter(factory.createMetricName("TotalDiskSpaceUsed"));
+        minRowSize = Metrics.newGauge(factory.createMetricName("MinRowSize"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                long min = 0;
+                for (SSTableReader sstable : cfs.getSSTables())
+                {
+                    if (min == 0 || sstable.getEstimatedRowSize().min() < min)
+                        min = sstable.getEstimatedRowSize().min();
+                }
+                return min;
+            }
+        });
+        maxRowSize = Metrics.newGauge(factory.createMetricName("MaxRowSize"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                long max = 0;
+                for (SSTableReader sstable : cfs.getSSTables())
+                {
+                    if (sstable.getEstimatedRowSize().max() > max)
+                        max = sstable.getEstimatedRowSize().max();
+                }
+                return max;
+            }
+        });
+        meanRowSize = Metrics.newGauge(factory.createMetricName("MeanRowSize"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                long sum = 0;
+                long count = 0;
+                for (SSTableReader sstable : cfs.getSSTables())
+                {
+                    sum += sstable.getEstimatedRowSize().mean();
+                    count++;
+                }
+                return count > 0 ? sum / count : 0;
+            }
+        });
+        bloomFilterFalsePositives = Metrics.newGauge(factory.createMetricName("BloomFilterFalsePositives"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                long count = 0L;
+                for (SSTableReader sstable: cfs.getSSTables())
+                    count += sstable.getBloomFilterFalsePositiveCount();
+                return count;
+            }
+        });
+        recentBloomFilterFalsePositives = Metrics.newGauge(factory.createMetricName("RecentBloomFilterFalsePositives"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                long count = 0L;
+                for (SSTableReader sstable: cfs.getSSTables())
+                    count += sstable.getRecentBloomFilterFalsePositiveCount();
+                return count;
+            }
+        });
+        bloomFilterFalseRatio = Metrics.newGauge(factory.createMetricName("BloomFilterFalseRatio"), new Gauge<Double>()
+        {
+            public Double value()
+            {
+                long falseCount = 0L;
+                long trueCount = 0L;
+                for (SSTableReader sstable: cfs.getSSTables())
+                {
+                    falseCount += sstable.getBloomFilterFalsePositiveCount();
+                    trueCount += sstable.getBloomFilterTruePositiveCount();
+                }
+                if (falseCount == 0L && trueCount == 0L)
+                    return 0d;
+                return (double) falseCount / (trueCount + falseCount);
+            }
+        });
+        recentBloomFilterFalseRatio = Metrics.newGauge(factory.createMetricName("RecentBloomFilterFalseRatio"), new Gauge<Double>()
+        {
+            public Double value()
+            {
+                long falseCount = 0L;
+                long trueCount = 0L;
+                for (SSTableReader sstable: cfs.getSSTables())
+                {
+                    falseCount += sstable.getRecentBloomFilterFalsePositiveCount();
+                    trueCount += sstable.getRecentBloomFilterTruePositiveCount();
+                }
+                if (falseCount == 0L && trueCount == 0L)
+                    return 0d;
+                return (double) falseCount / (trueCount + falseCount);
+            }
+        });
+        bloomFilterDiskSpaceUsed = Metrics.newGauge(factory.createMetricName("BloomFilterDiskSpaceUsed"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                long total = 0;
+                for (SSTableReader sst : cfs.getSSTables())
+                    total += sst.getBloomFilterSerializedSize();
+                return total;
+            }
+        });
+    }
+
+    public void updateSSTableIterated(int count)
+    {
+        sstablesPerReadHistogram.update(count);
+        recentSSTablesPerRead.add(count);
+        sstablesPerRead.add(count);
+    }
+
+    /**
+     * Release all associated metrics.
+     */
+    public void release()
+    {
+        readLatency.release();
+        writeLatency.release();
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("MemtableColumnsCount"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("MemtableDataSize"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("MemtableSwitchCount"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("CompressionRatio"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("EstimatedRowSizeHistogram"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("EstimatedColumnCountHistogram"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("SSTablesPerReadHistogram"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("PendingTasks"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("LiveSSTableCount"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("LiveDiskSpaceUsed"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("TotalDiskSpaceUsed"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("MinRowSize"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("MaxRowSize"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("MeanRowSize"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("BloomFilterFalsePositives"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("RecentBloomFilterFalsePositives"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("BloomFilterFalseRatio"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("RecentBloomFilterFalseRatio"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("BloomFilterDiskSpaceUsed"));
+    }
+
+    class ColumnFamilyMetricNameFactory implements MetricNameFactory
+    {
+        private final String keyspaceName;
+        private final String columnFamilyName;
+        private final boolean isIndex;
+
+        ColumnFamilyMetricNameFactory(ColumnFamilyStore cfs)
+        {
+            this.keyspaceName = cfs.table.name;
+            this.columnFamilyName = cfs.getColumnFamilyName();
+            isIndex = cfs.isIndex();
+        }
+
+        public MetricName createMetricName(String metricName)
+        {
+            String groupName = ColumnFamilyMetrics.class.getPackage().getName();
+            String type = isIndex ? "IndexColumnFamily" : "ColumnFamily";
+
+            StringBuilder mbeanName = new StringBuilder();
+            mbeanName.append(groupName).append(":");
+            mbeanName.append("type=").append(type);
+            mbeanName.append(",keyspace=").append(keyspaceName);
+            mbeanName.append(",scope=").append(columnFamilyName);
+            mbeanName.append(",name=").append(metricName);
+
+            return new MetricName(groupName, type, metricName, keyspaceName + "." + columnFamilyName, mbeanName.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
new file mode 100644
index 0000000..598d295
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.db.commitlog.CommitLogAllocator;
+import org.apache.cassandra.db.commitlog.ICommitLogExecutorService;
+
+/**
+ * Metrics for commit log
+ */
+public class CommitLogMetrics
+{
+    public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+    public static final String TYPE_NAME = "CommitLog";
+
+    /** Number of completed tasks */
+    public final Gauge<Long> completedTasks;
+    /** Number of pending tasks */
+    public final Gauge<Long> pendingTasks;
+    /** Current size used by all the commit log segments */
+    public final Gauge<Long> totalCommitLogSize;
+
+    public CommitLogMetrics(final ICommitLogExecutorService executor, final CommitLogAllocator allocator)
+    {
+        completedTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CompletedTasks"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return executor.getCompletedTasks();
+            }
+        });
+        pendingTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "PendingTasks"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return executor.getPendingTasks();
+            }
+        });
+        totalCommitLogSize = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "TotalCommitLogSize"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                return allocator.bytesUsed();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
new file mode 100644
index 0000000..ae098ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.*;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionManager;
+
+/**
+ * Metrics for compaction.
+ */
+public class CompactionMetrics implements CompactionManager.CompactionExecutorStatsCollector
+{
+    public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+    public static final String TYPE_NAME = "Compaction";
+
+    // a synchronized identity set of running tasks to their compaction info
+    private static final Set<CompactionInfo.Holder> compactions = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<CompactionInfo.Holder, Boolean>()));
+
+    /** Estimated number of compactions remaining to perform */
+    public final Gauge<Integer> pendingTasks;
+    /** Number of completed compactions since server [re]start */
+    public final Gauge<Long> completedTasks;
+    /** Total number of compactions since server [re]start */
+    public final Meter totalCompactionsCompleted;
+    /** Total number of bytes compacted since server [re]start */
+    public final Counter bytesCompacted;
+
+    public CompactionMetrics(final ThreadPoolExecutor... collectors)
+    {
+        pendingTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "PendingTasks"), new Gauge<Integer>()
+        {
+            public Integer value()
+            {
+                int n = 0;
+                for (String tableName : Schema.instance.getTables())
+                {
+                    for (ColumnFamilyStore cfs : Table.open(tableName).getColumnFamilyStores())
+                        n += cfs.getCompactionStrategy().getEstimatedRemainingTasks();
+                }
+                for (ThreadPoolExecutor collector : collectors)
+                    n += collector.getTaskCount() - collector.getCompletedTaskCount();
+                return n;
+            }
+        });
+        completedTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CompletedTasks"), new Gauge<Long>()
+        {
+            public Long value()
+            {
+                long completedTasks = 0;
+                for (ThreadPoolExecutor collector : collectors)
+                    completedTasks += collector.getCompletedTaskCount();
+                return completedTasks;
+            }
+        });
+        totalCompactionsCompleted = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "TotalCompactionsCompleted"), "compaction completed", TimeUnit.SECONDS);
+        bytesCompacted = Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "BytesCompacted"));
+    }
+
+    public void beginCompaction(CompactionInfo.Holder ci)
+    {
+        // notify
+        ci.started();
+        compactions.add(ci);
+    }
+
+    public void finishCompaction(CompactionInfo.Holder ci)
+    {
+        // notify
+        ci.finished();
+        compactions.remove(ci);
+        bytesCompacted.inc(ci.getCompactionInfo().getTotal());
+        totalCompactionsCompleted.mark();
+    }
+
+    public static List<CompactionInfo.Holder> getCompactions()
+    {
+        return new ArrayList<CompactionInfo.Holder>(compactions);
+    }
+}