You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/07/16 20:48:36 UTC

[3/3] cassandra git commit: Metrics should use up to date nomenclature

Metrics should use up to date nomenclature

patch by Stefania Alborghetti; reviewed by yukim for CASSANDRA-9448


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0bd5170c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0bd5170c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0bd5170c

Branch: refs/heads/trunk
Commit: 0bd5170c4ff55c9f3307602de031d77883731883
Parents: 4feaa7a
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Thu Jul 16 13:33:28 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Jul 16 13:47:50 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  41 +-
 .../cassandra/db/ColumnFamilyStoreMBean.java    |   3 +
 .../cassandra/db/PartitionRangeReadCommand.java |   4 +-
 .../org/apache/cassandra/db/ReadCommand.java    |   9 +-
 .../db/SinglePartitionNamesCommand.java         |   4 +-
 .../db/SinglePartitionReadCommand.java          |  12 +-
 .../db/SinglePartitionSliceCommand.java         |   4 +-
 .../cassandra/db/SizeEstimatesRecorder.java     |   4 +-
 .../io/sstable/format/SSTableReader.java        |   8 +-
 .../metadata/LegacyMetadataSerializer.java      |   6 +-
 .../io/sstable/metadata/StatsMetadata.java      |  22 +-
 .../metrics/CassandraMetricsRegistry.java       |  61 ++
 .../cassandra/metrics/ColumnFamilyMetrics.java  | 797 ------------------
 .../cassandra/metrics/KeyspaceMetrics.java      |  45 +-
 .../cassandra/metrics/LatencyMetrics.java       |  33 +-
 .../apache/cassandra/metrics/TableMetrics.java  | 837 +++++++++++++++++++
 .../cassandra/service/StorageService.java       | 114 +--
 .../cassandra/service/StorageServiceMBean.java  |  58 +-
 .../service/pager/MultiPartitionPager.java      |   2 +-
 .../cassandra/thrift/CassandraServer.java       |   2 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |  88 +-
 .../org/apache/cassandra/tools/NodeTool.java    |   2 +-
 .../cassandra/tools/SSTableMetadataViewer.java  |   4 +-
 .../cassandra/tools/nodetool/Cleanup.java       |   4 +-
 .../cassandra/tools/nodetool/Compact.java       |   4 +-
 .../tools/nodetool/DisableAutoCompaction.java   |   4 +-
 .../tools/nodetool/EnableAutoCompaction.java    |   4 +-
 .../apache/cassandra/tools/nodetool/Flush.java  |   4 +-
 .../apache/cassandra/tools/nodetool/Repair.java |   2 +-
 .../apache/cassandra/tools/nodetool/Scrub.java  |   4 +-
 .../cassandra/tools/nodetool/Snapshot.java      |   2 +-
 .../tools/nodetool/TableHistograms.java         |  18 +-
 .../cassandra/tools/nodetool/TableStats.java    |   8 +-
 .../cassandra/tools/nodetool/TopPartitions.java |   6 +-
 .../tools/nodetool/UpgradeSSTable.java          |   4 +-
 .../apache/cassandra/tools/nodetool/Verify.java |   4 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |  30 +-
 .../db/lifecycle/LifecycleTransactionTest.java  |   1 -
 .../cassandra/io/sstable/SSTableReaderTest.java |   2 +-
 .../service/StorageServiceServerTest.java       |   4 +-
 41 files changed, 1204 insertions(+), 1062 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5104bab..4e2c22e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Metrics should use up to date nomenclature (CASSANDRA-9448)
  * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384)
  * Cleanup crc and adler code for java 8 (CASSANDRA-9650)
  * Storage engine refactor (CASSANDRA-8099, 9743, 9746, 9759, 9781, 9808)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 845a4fc..03c8ca7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.utils.memory.MemtablePool;
 import org.json.simple.*;
 import org.slf4j.Logger;
@@ -67,8 +68,7 @@ import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.metrics.ColumnFamilyMetrics;
-import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
+import org.apache.cassandra.metrics.TableMetrics.Sampler;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.StreamLockfile;
@@ -145,6 +145,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public final CFMetaData metadata;
     public final IPartitioner partitioner;
     private final String mbeanName;
+    @Deprecated
+    private final String oldMBeanName;
     private volatile boolean valid = true;
 
     /**
@@ -172,7 +174,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public final Directories directories;
 
-    public final ColumnFamilyMetrics metric;
+    public final TableMetrics metric;
     public volatile long sampleLatencyNanos;
     private final ScheduledFuture<?> latencyCalculator;
 
@@ -335,7 +337,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         this.partitioner = partitioner;
         this.directories = directories;
         this.indexManager = new SecondaryIndexManager(this);
-        this.metric = new ColumnFamilyMetrics(this);
+        this.metric = new TableMetrics(this);
         fileIndexGenerator.set(generation);
         sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
 
@@ -375,13 +377,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (registerBookkeeping)
         {
             // register the mbean
-            String type = this.partitioner instanceof LocalPartitioner ? "IndexColumnFamilies" : "ColumnFamilies";
-            mbeanName = "org.apache.cassandra.db:type=" + type + ",keyspace=" + this.keyspace.getName() + ",columnfamily=" + name;
+            mbeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s",
+                                         isIndex() ? "IndexTables" : "Tables",
+                                         keyspace.getName(), name);
+            oldMBeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,columnfamily=%s",
+                                         isIndex() ? "IndexColumnFamilies" : "ColumnFamilies",
+                                         keyspace.getName(), name);
             try
             {
                 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-                ObjectName nameObj = new ObjectName(mbeanName);
-                mbs.registerMBean(this, nameObj);
+                ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)};
+                for (ObjectName objectName : objectNames)
+                {
+                    mbs.registerMBean(this, objectName);
+                }
             }
             catch (Exception e)
             {
@@ -414,6 +423,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             latencyCalculator = ScheduledExecutors.optionalTasks.schedule(Runnables.doNothing(), 0, TimeUnit.NANOSECONDS);
             mbeanName = null;
+            oldMBeanName= null;
         }
     }
 
@@ -463,9 +473,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     void unregisterMBean() throws MalformedObjectNameException, InstanceNotFoundException, MBeanRegistrationException
     {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        ObjectName nameObj = new ObjectName(mbeanName);
-        if (mbs.isRegistered(nameObj))
-            mbs.unregisterMBean(nameObj);
+        ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)};
+        for (ObjectName objectName : objectNames)
+        {
+            if (mbs.isRegistered(objectName))
+                mbs.unregisterMBean(objectName);
+        }
 
         // unregister metrics
         metric.release();
@@ -824,8 +837,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    @Deprecated
     public String getColumnFamilyName()
     {
+        return getTableName();
+    }
+
+    public String getTableName()
+    {
         return name;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 6f613b6..c23df74 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -32,8 +32,11 @@ public interface ColumnFamilyStoreMBean
     /**
      * @return the name of the column family
      */
+    @Deprecated
     public String getColumnFamilyName();
 
+    public String getTableName();
+
     /**
      * force a major compaction of this column family
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index c11a9be..65e38d0 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.metrics.ColumnFamilyMetrics;
+import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.thrift.ThriftResultsMerger;
@@ -150,7 +150,7 @@ public class PartitionRangeReadCommand extends ReadCommand
             return new RangeSliceQueryPager(this, pagingState);
     }
 
-    protected void recordLatency(ColumnFamilyMetrics metric, long latencyNanos)
+    protected void recordLatency(TableMetrics metric, long latencyNanos)
     {
         metric.rangeLatency.addNano(latencyNanos);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index b5182dd..99547f0 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -20,12 +20,9 @@ package org.apache.cassandra.db;
 import java.io.DataInput;
 import java.io.IOException;
 
-import com.google.common.collect.Iterables;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
@@ -35,7 +32,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.metrics.ColumnFamilyMetrics;
+import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.ClientWarn;
@@ -301,7 +298,7 @@ public abstract class ReadCommand implements ReadQuery
         }
     }
 
-    protected abstract void recordLatency(ColumnFamilyMetrics metric, long latencyNanos);
+    protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
 
     public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
     {
@@ -317,7 +314,7 @@ public abstract class ReadCommand implements ReadQuery
      * Wraps the provided iterator so that metrics on what is scanned by the command are recorded.
      * This also log warning/trow TombstoneOverwhelmingException if appropriate.
      */
-    private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final ColumnFamilyMetrics metric, final long startTimeNanos)
+    private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos)
     {
         return new WrappingUnfilteredPartitionIterator(iter)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index c0f8207..cb43cd3 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
+import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.thrift.ThriftResultsMerger;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.SearchIterator;
@@ -133,7 +133,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus
             return UnfilteredRowIterators.emptyIterator(metadata(), partitionKey(), false);
 
         DecoratedKey key = result.partitionKey();
-        cfs.metric.samplers.get(Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+        cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
 
         // "hoist up" the requested data into a more recent sstable
         if (sstablesIterated > cfs.getMinimumCompactionThreshold()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 3fa8486..f9f583f 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.metrics.ColumnFamilyMetrics;
+import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -215,7 +215,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
         return new SinglePartitionPager(command, pagingState);
     }
 
-    protected void recordLatency(ColumnFamilyMetrics metric, long latencyNanos)
+    protected void recordLatency(TableMetrics metric, long latencyNanos)
     {
         metric.readLatency.addNano(latencyNanos);
     }
@@ -256,24 +256,24 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
             {
                 // Some other read is trying to cache the value, just do a normal non-caching read
                 Tracing.trace("Row cache miss (race)");
-                cfs.metric.rowCacheMiss.inc();
+                cfs.metric.partitionCacheMiss.inc();
                 return queryMemtableAndDisk(cfs, readOp);
             }
 
             CachedPartition cachedPartition = (CachedPartition)cached;
             if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), cachedPartition, nowInSec()))
             {
-                cfs.metric.rowCacheHit.inc();
+                cfs.metric.partitionCacheHit.inc();
                 Tracing.trace("Row cache hit");
                 return clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), cachedPartition);
             }
 
-            cfs.metric.rowCacheHitOutOfRange.inc();
+            cfs.metric.partitionCacheHitOutOfRange.inc();
             Tracing.trace("Ignoring row cache as cached value could not satisfy query");
             return queryMemtableAndDisk(cfs, readOp);
         }
 
-        cfs.metric.rowCacheMiss.inc();
+        cfs.metric.partitionCacheMiss.inc();
         Tracing.trace("Row cache miss");
 
         boolean cacheFullPartitions = metadata().getCaching().rowCache.cacheFullPartitions();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
index 65b4e3f..d74dc4e 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
+import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.thrift.ThriftResultsMerger;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
@@ -211,7 +211,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus
             if (!merged.isEmpty())
             {
                 DecoratedKey key = merged.partitionKey();
-                cfs.metric.samplers.get(Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+                cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
             }
 
             return merged;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
index c68109c..2b03c08 100644
--- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -131,8 +131,8 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable
         long sum = 0, count = 0;
         for (SSTableReader sstable : sstables)
         {
-            long n = sstable.getEstimatedRowSize().count();
-            sum += sstable.getEstimatedRowSize().mean() * n;
+            long n = sstable.getEstimatedPartitionSize().count();
+            sum += sstable.getEstimatedPartitionSize().mean() * n;
             count += n;
         }
         return count > 0 ? sum / count : 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 81ec24b..3d9fe82 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -786,8 +786,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         try (RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))))
         {
             long indexSize = primaryIndex.length();
-            long histogramCount = sstableMetadata.estimatedRowSize.count();
-            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
+            long histogramCount = sstableMetadata.estimatedPartitionSize.count();
+            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedPartitionSize.isOverflowed()
                     ? histogramCount
                     : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
 
@@ -1825,9 +1825,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return keyCache;
     }
 
-    public EstimatedHistogram getEstimatedRowSize()
+    public EstimatedHistogram getEstimatedPartitionSize()
     {
-        return sstableMetadata.estimatedRowSize;
+        return sstableMetadata.estimatedPartitionSize;
     }
 
     public EstimatedHistogram getEstimatedColumnCount()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index ab048d6..433c31a 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -51,7 +51,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
 
         assert validation != null && stats != null && compaction != null && validation.partitioner != null;
 
-        EstimatedHistogram.serializer.serialize(stats.estimatedRowSize, out);
+        EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out);
         EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
         ReplayPosition.serializer.serialize(stats.replayPosition, out);
         out.writeLong(stats.minTimestamp);
@@ -90,7 +90,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
         {
             try (DataInputStreamPlus in = new DataInputStreamPlus(new BufferedInputStream(new FileInputStream(statsFile))))
             {
-                EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
+                EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);
                 EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
                 ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
                 long minTimestamp = in.readLong();
@@ -123,7 +123,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
                                    new ValidationMetadata(partitioner, bloomFilterFPChance));
                 if (types.contains(MetadataType.STATS))
                     components.put(MetadataType.STATS,
-                                   new StatsMetadata(rowSizes,
+                                   new StatsMetadata(partitionSizes,
                                                      columnCounts,
                                                      replayPosition,
                                                      minTimestamp,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index e5ec3fd..8153533 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -40,7 +40,7 @@ public class StatsMetadata extends MetadataComponent
 {
     public static final IMetadataComponentSerializer serializer = new StatsMetadataSerializer();
 
-    public final EstimatedHistogram estimatedRowSize;
+    public final EstimatedHistogram estimatedPartitionSize;
     public final EstimatedHistogram estimatedColumnCount;
     public final ReplayPosition replayPosition;
     public final long minTimestamp;
@@ -59,7 +59,7 @@ public class StatsMetadata extends MetadataComponent
     public final long totalColumnsSet;
     public final long totalRows;
 
-    public StatsMetadata(EstimatedHistogram estimatedRowSize,
+    public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
                          EstimatedHistogram estimatedColumnCount,
                          ReplayPosition replayPosition,
                          long minTimestamp,
@@ -78,7 +78,7 @@ public class StatsMetadata extends MetadataComponent
                          long totalColumnsSet,
                          long totalRows)
     {
-        this.estimatedRowSize = estimatedRowSize;
+        this.estimatedPartitionSize = estimatedPartitionSize;
         this.estimatedColumnCount = estimatedColumnCount;
         this.replayPosition = replayPosition;
         this.minTimestamp = minTimestamp;
@@ -129,7 +129,7 @@ public class StatsMetadata extends MetadataComponent
 
     public StatsMetadata mutateLevel(int newLevel)
     {
-        return new StatsMetadata(estimatedRowSize,
+        return new StatsMetadata(estimatedPartitionSize,
                                  estimatedColumnCount,
                                  replayPosition,
                                  minTimestamp,
@@ -151,7 +151,7 @@ public class StatsMetadata extends MetadataComponent
 
     public StatsMetadata mutateRepairedAt(long newRepairedAt)
     {
-        return new StatsMetadata(estimatedRowSize,
+        return new StatsMetadata(estimatedPartitionSize,
                                  estimatedColumnCount,
                                  replayPosition,
                                  minTimestamp,
@@ -179,7 +179,7 @@ public class StatsMetadata extends MetadataComponent
 
         StatsMetadata that = (StatsMetadata) o;
         return new EqualsBuilder()
-                       .append(estimatedRowSize, that.estimatedRowSize)
+                       .append(estimatedPartitionSize, that.estimatedPartitionSize)
                        .append(estimatedColumnCount, that.estimatedColumnCount)
                        .append(replayPosition, that.replayPosition)
                        .append(minTimestamp, that.minTimestamp)
@@ -204,7 +204,7 @@ public class StatsMetadata extends MetadataComponent
     public int hashCode()
     {
         return new HashCodeBuilder()
-                       .append(estimatedRowSize)
+                       .append(estimatedPartitionSize)
                        .append(estimatedColumnCount)
                        .append(replayPosition)
                        .append(minTimestamp)
@@ -230,7 +230,7 @@ public class StatsMetadata extends MetadataComponent
         public int serializedSize(StatsMetadata component) throws IOException
         {
             int size = 0;
-            size += EstimatedHistogram.serializer.serializedSize(component.estimatedRowSize);
+            size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize);
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount);
             size += ReplayPosition.serializer.serializedSize(component.replayPosition);
             size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
@@ -251,7 +251,7 @@ public class StatsMetadata extends MetadataComponent
 
         public void serialize(StatsMetadata component, DataOutputPlus out) throws IOException
         {
-            EstimatedHistogram.serializer.serialize(component.estimatedRowSize, out);
+            EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out);
             EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
             ReplayPosition.serializer.serialize(component.replayPosition, out);
             out.writeLong(component.minTimestamp);
@@ -278,7 +278,7 @@ public class StatsMetadata extends MetadataComponent
 
         public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
         {
-            EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
+            EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);
             EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
             ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
             long minTimestamp = in.readLong();
@@ -312,7 +312,7 @@ public class StatsMetadata extends MetadataComponent
             long totalColumnsSet = version.storeRows() ? in.readLong() : -1L;
             long totalRows = version.storeRows() ? in.readLong() : -1L;
 
-            return new StatsMetadata(rowSizes,
+            return new StatsMetadata(partitionSizes,
                                      columnCounts,
                                      replayPosition,
                                      minTimestamp,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 26b9c07..f7adb79 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -50,6 +50,13 @@ public class CassandraMetricsRegistry extends MetricRegistry
         return counter;
     }
 
+    public Counter counter(MetricName name, MetricName alias)
+    {
+        Counter counter = counter(name);
+        registerAlias(name, alias);
+        return counter;
+    }
+
     public Meter meter(MetricName name)
     {
         Meter meter = meter(name.getMetricName());
@@ -58,6 +65,13 @@ public class CassandraMetricsRegistry extends MetricRegistry
         return meter;
     }
 
+    public Meter meter(MetricName name, MetricName alias)
+    {
+        Meter meter = meter(name);
+        registerAlias(name, alias);
+        return meter;
+    }
+
     public Histogram histogram(MetricName name)
     {
         Histogram histogram = register(name, new ClearableHistogram(new EstimatedHistogramReservoir()));
@@ -66,6 +80,13 @@ public class CassandraMetricsRegistry extends MetricRegistry
         return histogram;
     }
 
+    public Histogram histogram(MetricName name, MetricName alias)
+    {
+        Histogram histogram = histogram(name);
+        registerAlias(name, alias);
+        return histogram;
+    }
+
     public Timer timer(MetricName name)
     {
         Timer timer = register(name, new Timer(new EstimatedHistogramReservoir()));
@@ -74,6 +95,13 @@ public class CassandraMetricsRegistry extends MetricRegistry
         return timer;
     }
 
+    public Timer timer(MetricName name, MetricName alias)
+    {
+        Timer timer = timer(name);
+        registerAlias(name, alias);
+        return timer;
+    }
+
     public <T extends Metric> T register(MetricName name, T metric)
     {
         try
@@ -89,6 +117,13 @@ public class CassandraMetricsRegistry extends MetricRegistry
         }
     }
 
+    public <T extends Metric> T register(MetricName name, MetricName aliasName, T metric)
+    {
+        T ret = register(name, metric);
+        registerAlias(name, aliasName);
+        return ret;
+    }
+
     public boolean remove(MetricName name)
     {
         boolean removed = remove(name.getMetricName());
@@ -101,6 +136,16 @@ public class CassandraMetricsRegistry extends MetricRegistry
         return removed;
     }
 
+    public boolean remove(MetricName name, MetricName alias)
+    {
+        if (remove(name))
+        {
+            removeAlias(alias);
+            return true;
+        }
+        return false;
+    }
+
     public void registerMBean(Metric metric, ObjectName name)
     {
         AbstractBean mbean;
@@ -131,6 +176,22 @@ public class CassandraMetricsRegistry extends MetricRegistry
         } catch (Exception ignored) {}
     }
 
+    private void registerAlias(MetricName existingName, MetricName aliasName)
+    {
+        Metric existing = Metrics.getMetrics().get(existingName.getMetricName());
+        assert existing != null : existingName + " not registered";
+
+        registerMBean(existing, aliasName.getMBeanName());
+    }
+
+    private void removeAlias(MetricName name)
+    {
+        try
+        {
+            mBeanServer.unregisterMBean(name.getMBeanName());
+        } catch (Exception ignore) {}
+    }
+
     public interface MetricMBean
     {
         ObjectName objectName();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
deleted file mode 100644
index 6ad50d3..0000000
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ /dev/null
@@ -1,797 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.metrics;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import com.codahale.metrics.*;
-import com.codahale.metrics.Timer;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Memtable;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.utils.EstimatedHistogram;
-import org.apache.cassandra.utils.TopKSampler;
-
-import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
-
-
-/**
- * Metrics for {@link ColumnFamilyStore}.
- */
-public class ColumnFamilyMetrics
-{
-
-    /** Total amount of data stored in the memtable that resides on-heap, including column related overhead and overwritten rows. */
-    public final Gauge<Long> memtableOnHeapSize;
-    /** Total amount of data stored in the memtable that resides off-heap, including column related overhead and overwritten rows. */
-    public final Gauge<Long> memtableOffHeapSize;
-    /** Total amount of live data stored in the memtable, excluding any data structure overhead */
-    public final Gauge<Long> memtableLiveDataSize;
-    /** Total amount of data stored in the memtables (2i and pending flush memtables included) that resides on-heap. */
-    public final Gauge<Long> allMemtablesOnHeapSize;
-    /** Total amount of data stored in the memtables (2i and pending flush memtables included) that resides off-heap. */
-    public final Gauge<Long> allMemtablesOffHeapSize;
-    /** Total amount of live data stored in the memtables (2i and pending flush memtables included) that resides off-heap, excluding any data structure overhead */
-    public final Gauge<Long> allMemtablesLiveDataSize;
-    /** Total number of columns present in the memtable. */
-    public final Gauge<Long> memtableColumnsCount;
-    /** Number of times flush has resulted in the memtable being switched out. */
-    public final Counter memtableSwitchCount;
-    /** Current compression ratio for all SSTables */
-    public final Gauge<Double> compressionRatio;
-    /** Histogram of estimated row size (in bytes). */
-    public final Gauge<long[]> estimatedRowSizeHistogram;
-    /** Approximate number of keys in table. */
-    public final Gauge<Long> estimatedRowCount;
-    /** Histogram of estimated number of columns. */
-    public final Gauge<long[]> estimatedColumnCountHistogram;
-    /** Histogram of the number of sstable data files accessed per read */
-    public final ColumnFamilyHistogram sstablesPerReadHistogram;
-    /** (Local) read metrics */
-    public final LatencyMetrics readLatency;
-    /** (Local) range slice metrics */
-    public final LatencyMetrics rangeLatency;
-    /** (Local) write metrics */
-    public final LatencyMetrics writeLatency;
-    /** Estimated number of tasks pending for this column family */
-    public final Counter pendingFlushes;
-    /** Estimate of number of pending compactios for this CF */
-    public final Gauge<Integer> pendingCompactions;
-    /** Number of SSTables on disk for this CF */
-    public final Gauge<Integer> liveSSTableCount;
-    /** Disk space used by SSTables belonging to this CF */
-    public final Counter liveDiskSpaceUsed;
-    /** Total disk space used by SSTables belonging to this CF, including obsolete ones waiting to be GC'd */
-    public final Counter totalDiskSpaceUsed;
-    /** Size of the smallest compacted row */
-    public final Gauge<Long> minRowSize;
-    /** Size of the largest compacted row */
-    public final Gauge<Long> maxRowSize;
-    /** Size of the smallest compacted row */
-    public final Gauge<Long> meanRowSize;
-    /** Number of false positives in bloom filter */
-    public final Gauge<Long> bloomFilterFalsePositives;
-    /** Number of false positives in bloom filter from last read */
-    public final Gauge<Long> recentBloomFilterFalsePositives;
-    /** False positive ratio of bloom filter */
-    public final Gauge<Double> bloomFilterFalseRatio;
-    /** False positive ratio of bloom filter from last read */
-    public final Gauge<Double> recentBloomFilterFalseRatio;
-    /** Disk space used by bloom filter */
-    public final Gauge<Long> bloomFilterDiskSpaceUsed;
-    /** Off heap memory used by bloom filter */
-    public final Gauge<Long> bloomFilterOffHeapMemoryUsed;
-    /** Off heap memory used by index summary */
-    public final Gauge<Long> indexSummaryOffHeapMemoryUsed;
-    /** Off heap memory used by compression meta data*/
-    public final Gauge<Long> compressionMetadataOffHeapMemoryUsed;
-    /** Key cache hit rate  for this CF */
-    public final Gauge<Double> keyCacheHitRate;
-    /** Tombstones scanned in queries on this CF */
-    public final ColumnFamilyHistogram tombstoneScannedHistogram;
-    /** Live cells scanned in queries on this CF */
-    public final ColumnFamilyHistogram liveScannedHistogram;
-    /** Column update time delta on this CF */
-    public final ColumnFamilyHistogram colUpdateTimeDeltaHistogram;
-    /** Disk space used by snapshot files which */
-    public final Gauge<Long> trueSnapshotsSize;
-    /** Row cache hits, but result out of range */
-    public final Counter rowCacheHitOutOfRange;
-    /** Number of row cache hits */
-    public final Counter rowCacheHit;
-    /** Number of row cache misses */
-    public final Counter rowCacheMiss;
-    /** CAS Prepare metrics */
-    public final LatencyMetrics casPrepare;
-    /** CAS Propose metrics */
-    public final LatencyMetrics casPropose;
-    /** CAS Commit metrics */
-    public final LatencyMetrics casCommit;
-
-    public final Timer coordinatorReadLatency;
-    public final Timer coordinatorScanLatency;
-
-    /** Time spent waiting for free memtable space, either on- or off-heap */
-    public final Histogram waitingOnFreeMemtableSpace;
-
-    private final MetricNameFactory factory;
-    private static final MetricNameFactory globalNameFactory = new AllColumnFamilyMetricNameFactory();
-
-    public final Counter speculativeRetries;
-
-    public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalNameFactory, "Read");
-    public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalNameFactory, "Write");
-    public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalNameFactory, "Range");
-    
-    public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
-    /**
-     * stores metrics that will be rolled into a single global metric
-     */
-    public final static ConcurrentMap<String, Set<Metric>> allColumnFamilyMetrics = Maps.newConcurrentMap();
-    
-    /**
-     * Stores all metric names created that can be used when unregistering
-     */
-    public final static Set<String> all = Sets.newHashSet();
-
-    private interface GetHistogram
-    {
-        public EstimatedHistogram getHistogram(SSTableReader reader);
-    }
-
-    private static long[] combineHistograms(Iterable<SSTableReader> sstables, GetHistogram getHistogram)
-    {
-        Iterator<SSTableReader> iterator = sstables.iterator();
-        if (!iterator.hasNext())
-        {
-            return new long[0];
-        }
-        long[] firstBucket = getHistogram.getHistogram(iterator.next()).getBuckets(false);
-        long[] values = new long[firstBucket.length];
-        System.arraycopy(firstBucket, 0, values, 0, values.length);
-
-        while (iterator.hasNext())
-        {
-            long[] nextBucket = getHistogram.getHistogram(iterator.next()).getBuckets(false);
-            if (nextBucket.length > values.length)
-            {
-                long[] newValues = new long[nextBucket.length];
-                System.arraycopy(firstBucket, 0, newValues, 0, firstBucket.length);
-                for (int i = 0; i < newValues.length; i++)
-                {
-                    newValues[i] += nextBucket[i];
-                }
-                values = newValues;
-            }
-            else
-            {
-                for (int i = 0; i < values.length; i++)
-                {
-                    values[i] += nextBucket[i];
-                }
-            }
-        }
-        return values;
-    }
-    
-    /**
-     * Creates metrics for given {@link ColumnFamilyStore}.
-     *
-     * @param cfs ColumnFamilyStore to measure metrics
-     */
-    public ColumnFamilyMetrics(final ColumnFamilyStore cfs)
-    {
-        factory = new ColumnFamilyMetricNameFactory(cfs);
-
-        samplers = Maps.newHashMap();
-        for (Sampler sampler : Sampler.values())
-        {
-            samplers.put(sampler, new TopKSampler<ByteBuffer>());
-        }
-
-        memtableColumnsCount = createColumnFamilyGauge("MemtableColumnsCount", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                return cfs.getTracker().getView().getCurrentMemtable().getOperations();
-            }
-        });
-        memtableOnHeapSize = createColumnFamilyGauge("MemtableOnHeapSize", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                return cfs.getTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns();
-            }
-        });
-        memtableOffHeapSize = createColumnFamilyGauge("MemtableOffHeapSize", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                return cfs.getTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns();
-            }
-        });
-        memtableLiveDataSize = createColumnFamilyGauge("MemtableLiveDataSize", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                return cfs.getTracker().getView().getCurrentMemtable().getLiveDataSize();
-            }
-        });
-        allMemtablesOnHeapSize = createColumnFamilyGauge("AllMemtablesHeapSize", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long size = 0;
-                for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes())
-                    size += cfs2.getTracker().getView().getCurrentMemtable().getAllocator().onHeap().owns();
-                return size;
-            }
-        });
-        allMemtablesOffHeapSize = createColumnFamilyGauge("AllMemtablesOffHeapSize", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long size = 0;
-                for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes())
-                    size += cfs2.getTracker().getView().getCurrentMemtable().getAllocator().offHeap().owns();
-                return size;
-            }
-        });
-        allMemtablesLiveDataSize = createColumnFamilyGauge("AllMemtablesLiveDataSize", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long size = 0;
-                for (ColumnFamilyStore cfs2 : cfs.concatWithIndexes())
-                    size += cfs2.getTracker().getView().getCurrentMemtable().getLiveDataSize();
-                return size;
-            }
-        });
-        memtableSwitchCount = createColumnFamilyCounter("MemtableSwitchCount");
-        estimatedRowSizeHistogram = Metrics.register(factory.createMetricName("EstimatedRowSizeHistogram"), new Gauge<long[]>()
-        {
-            public long[] getValue()
-            {
-                return combineHistograms(cfs.getSSTables(), new GetHistogram()
-                {
-                    public EstimatedHistogram getHistogram(SSTableReader reader)
-                    {
-                        return reader.getEstimatedRowSize();
-                    }
-                });
-            }
-        });
-        estimatedRowCount = Metrics.register(factory.createMetricName("EstimatedRowCount"), new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long memtablePartitions = 0;
-                for (Memtable memtable : cfs.getTracker().getView().getAllMemtables())
-                    memtablePartitions += memtable.partitionCount();
-                return SSTableReader.getApproximateKeyCount(cfs.getSSTables()) + memtablePartitions;
-            }
-        });
-        estimatedColumnCountHistogram = Metrics.register(factory.createMetricName("EstimatedColumnCountHistogram"), new Gauge<long[]>()
-        {
-            public long[] getValue()
-            {
-                return combineHistograms(cfs.getSSTables(), new GetHistogram()
-                {
-                    public EstimatedHistogram getHistogram(SSTableReader reader)
-                    {
-                        return reader.getEstimatedColumnCount();
-                    }
-                });
-            }
-        });
-        sstablesPerReadHistogram = createColumnFamilyHistogram("SSTablesPerReadHistogram", cfs.keyspace.metric.sstablesPerReadHistogram);
-        compressionRatio = createColumnFamilyGauge("CompressionRatio", new Gauge<Double>()
-        {
-            public Double getValue()
-            {
-                double sum = 0;
-                int total = 0;
-                for (SSTableReader sstable : cfs.getSSTables())
-                {
-                    if (sstable.getCompressionRatio() != MetadataCollector.NO_COMPRESSION_RATIO)
-                    {
-                        sum += sstable.getCompressionRatio();
-                        total++;
-                    }
-                }
-                return total != 0 ? sum / total : 0;
-            }
-        }, new Gauge<Double>() // global gauge
-        {
-            public Double getValue()
-            {
-                double sum = 0;
-                int total = 0;
-                for (Keyspace keyspace : Keyspace.all())
-                {
-                    for (SSTableReader sstable : keyspace.getAllSSTables())
-                    {
-                        if (sstable.getCompressionRatio() != MetadataCollector.NO_COMPRESSION_RATIO)
-                        {
-                            sum += sstable.getCompressionRatio();
-                            total++;
-                        }
-                    }
-                }
-                return total != 0 ? sum / total : 0;
-            }
-        });
-        readLatency = new LatencyMetrics(factory, "Read", cfs.keyspace.metric.readLatency, globalReadLatency);
-        writeLatency = new LatencyMetrics(factory, "Write", cfs.keyspace.metric.writeLatency, globalWriteLatency);
-        rangeLatency = new LatencyMetrics(factory, "Range", cfs.keyspace.metric.rangeLatency, globalRangeLatency);
-        pendingFlushes = createColumnFamilyCounter("PendingFlushes");
-        pendingCompactions = createColumnFamilyGauge("PendingCompactions", new Gauge<Integer>()
-        {
-            public Integer getValue()
-            {
-                return cfs.getCompactionStrategyManager().getEstimatedRemainingTasks();
-            }
-        });
-        liveSSTableCount = createColumnFamilyGauge("LiveSSTableCount", new Gauge<Integer>()
-        {
-            public Integer getValue()
-            {
-                return cfs.getTracker().getSSTables().size();
-            }
-        });
-        liveDiskSpaceUsed = createColumnFamilyCounter("LiveDiskSpaceUsed");
-        totalDiskSpaceUsed = createColumnFamilyCounter("TotalDiskSpaceUsed");
-        minRowSize = createColumnFamilyGauge("MinRowSize", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long min = 0;
-                for (SSTableReader sstable : cfs.getSSTables())
-                {
-                    if (min == 0 || sstable.getEstimatedRowSize().min() < min)
-                        min = sstable.getEstimatedRowSize().min();
-                }
-                return min;
-            }
-        }, new Gauge<Long>() // global gauge
-        {
-            public Long getValue()
-            {
-                long min = Long.MAX_VALUE;
-                for (Metric cfGauge : allColumnFamilyMetrics.get("MinRowSize"))
-                {
-                    min = Math.min(min, ((Gauge<? extends Number>) cfGauge).getValue().longValue());
-                }
-                return min;
-            }
-        });
-        maxRowSize = createColumnFamilyGauge("MaxRowSize", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long max = 0;
-                for (SSTableReader sstable : cfs.getSSTables())
-                {
-                    if (sstable.getEstimatedRowSize().max() > max)
-                        max = sstable.getEstimatedRowSize().max();
-                }
-                return max;
-            }
-        }, new Gauge<Long>() // global gauge
-        {
-            public Long getValue()
-            {
-                long max = 0;
-                for (Metric cfGauge : allColumnFamilyMetrics.get("MaxRowSize"))
-                {
-                    max = Math.max(max, ((Gauge<? extends Number>) cfGauge).getValue().longValue());
-                }
-                return max;
-            }
-        });
-        meanRowSize = createColumnFamilyGauge("MeanRowSize", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long sum = 0;
-                long count = 0;
-                for (SSTableReader sstable : cfs.getSSTables())
-                {
-                    long n = sstable.getEstimatedRowSize().count();
-                    sum += sstable.getEstimatedRowSize().mean() * n;
-                    count += n;
-                }
-                return count > 0 ? sum / count : 0;
-            }
-        }, new Gauge<Long>() // global gauge
-        {
-            public Long getValue()
-            {
-                long sum = 0;
-                long count = 0;
-                for (Keyspace keyspace : Keyspace.all())
-                {
-                    for (SSTableReader sstable : keyspace.getAllSSTables())
-                    {
-                        long n = sstable.getEstimatedRowSize().count();
-                        sum += sstable.getEstimatedRowSize().mean() * n;
-                        count += n;
-                    }
-                }
-                return count > 0 ? sum / count : 0;
-            }
-        });
-        bloomFilterFalsePositives = createColumnFamilyGauge("BloomFilterFalsePositives", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long count = 0L;
-                for (SSTableReader sstable: cfs.getSSTables())
-                    count += sstable.getBloomFilterFalsePositiveCount();
-                return count;
-            }
-        });
-        recentBloomFilterFalsePositives = createColumnFamilyGauge("RecentBloomFilterFalsePositives", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long count = 0L;
-                for (SSTableReader sstable : cfs.getSSTables())
-                    count += sstable.getRecentBloomFilterFalsePositiveCount();
-                return count;
-            }
-        });
-        bloomFilterFalseRatio = createColumnFamilyGauge("BloomFilterFalseRatio", new Gauge<Double>()
-        {
-            public Double getValue()
-            {
-                long falseCount = 0L;
-                long trueCount = 0L;
-                for (SSTableReader sstable : cfs.getSSTables())
-                {
-                    falseCount += sstable.getBloomFilterFalsePositiveCount();
-                    trueCount += sstable.getBloomFilterTruePositiveCount();
-                }
-                if (falseCount == 0L && trueCount == 0L)
-                    return 0d;
-                return (double) falseCount / (trueCount + falseCount);
-            }
-        }, new Gauge<Double>() // global gauge
-        {
-            public Double getValue()
-            {
-                long falseCount = 0L;
-                long trueCount = 0L;
-                for (Keyspace keyspace : Keyspace.all())
-                {
-                    for (SSTableReader sstable : keyspace.getAllSSTables())
-                    {
-                        falseCount += sstable.getBloomFilterFalsePositiveCount();
-                        trueCount += sstable.getBloomFilterTruePositiveCount();
-                    }
-                }
-                if (falseCount == 0L && trueCount == 0L)
-                    return 0d;
-                return (double) falseCount / (trueCount + falseCount);
-            }
-        });
-        recentBloomFilterFalseRatio = createColumnFamilyGauge("RecentBloomFilterFalseRatio", new Gauge<Double>()
-        {
-            public Double getValue()
-            {
-                long falseCount = 0L;
-                long trueCount = 0L;
-                for (SSTableReader sstable: cfs.getSSTables())
-                {
-                    falseCount += sstable.getRecentBloomFilterFalsePositiveCount();
-                    trueCount += sstable.getRecentBloomFilterTruePositiveCount();
-                }
-                if (falseCount == 0L && trueCount == 0L)
-                    return 0d;
-                return (double) falseCount / (trueCount + falseCount);
-            }
-        }, new Gauge<Double>() // global gauge
-        {
-            public Double getValue()
-            {
-                long falseCount = 0L;
-                long trueCount = 0L;
-                for (Keyspace keyspace : Keyspace.all())
-                {
-                    for (SSTableReader sstable : keyspace.getAllSSTables())
-                    {
-                        falseCount += sstable.getRecentBloomFilterFalsePositiveCount();
-                        trueCount += sstable.getRecentBloomFilterTruePositiveCount();
-                    }
-                }
-                if (falseCount == 0L && trueCount == 0L)
-                    return 0d;
-                return (double) falseCount / (trueCount + falseCount);
-            }
-        });
-        bloomFilterDiskSpaceUsed = createColumnFamilyGauge("BloomFilterDiskSpaceUsed", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long total = 0;
-                for (SSTableReader sst : cfs.getSSTables())
-                    total += sst.getBloomFilterSerializedSize();
-                return total;
-            }
-        });
-        bloomFilterOffHeapMemoryUsed = createColumnFamilyGauge("BloomFilterOffHeapMemoryUsed", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long total = 0;
-                for (SSTableReader sst : cfs.getSSTables())
-                    total += sst.getBloomFilterOffHeapSize();
-                return total;
-            }
-        });
-        indexSummaryOffHeapMemoryUsed = createColumnFamilyGauge("IndexSummaryOffHeapMemoryUsed", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long total = 0;
-                for (SSTableReader sst : cfs.getSSTables())
-                    total += sst.getIndexSummaryOffHeapSize();
-                return total;
-            }
-        });
-        compressionMetadataOffHeapMemoryUsed = createColumnFamilyGauge("CompressionMetadataOffHeapMemoryUsed", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long total = 0;
-                for (SSTableReader sst : cfs.getSSTables())
-                    total += sst.getCompressionMetadataOffHeapSize();
-                return total;
-            }
-        });
-        speculativeRetries = createColumnFamilyCounter("SpeculativeRetries");
-        keyCacheHitRate = Metrics.register(factory.createMetricName("KeyCacheHitRate"), new RatioGauge()
-        {
-            @Override
-            public Ratio getRatio()
-            {
-                return Ratio.of(getNumerator(), getDenominator());
-            }
-
-            protected double getNumerator()
-            {
-                long hits = 0L;
-                for (SSTableReader sstable : cfs.getSSTables())
-                    hits += sstable.getKeyCacheHit();
-                return hits;
-            }
-
-            protected double getDenominator()
-            {
-                long requests = 0L;
-                for (SSTableReader sstable : cfs.getSSTables())
-                    requests += sstable.getKeyCacheRequest();
-                return Math.max(requests, 1); // to avoid NaN.
-            }
-        });
-        tombstoneScannedHistogram = createColumnFamilyHistogram("TombstoneScannedHistogram", cfs.keyspace.metric.tombstoneScannedHistogram);
-        liveScannedHistogram = createColumnFamilyHistogram("LiveScannedHistogram", cfs.keyspace.metric.liveScannedHistogram);
-        colUpdateTimeDeltaHistogram = createColumnFamilyHistogram("ColUpdateTimeDeltaHistogram", cfs.keyspace.metric.colUpdateTimeDeltaHistogram);
-        coordinatorReadLatency = Metrics.timer(factory.createMetricName("CoordinatorReadLatency"));
-        coordinatorScanLatency = Metrics.timer(factory.createMetricName("CoordinatorScanLatency"));
-        waitingOnFreeMemtableSpace = Metrics.histogram(factory.createMetricName("WaitingOnFreeMemtableSpace"));
-
-        trueSnapshotsSize = createColumnFamilyGauge("SnapshotsSize", new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                return cfs.trueSnapshotsSize();
-            }
-        });
-        rowCacheHitOutOfRange = createColumnFamilyCounter("RowCacheHitOutOfRange");
-        rowCacheHit = createColumnFamilyCounter("RowCacheHit");
-        rowCacheMiss = createColumnFamilyCounter("RowCacheMiss");
-
-        casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
-        casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
-        casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
-    }
-
-    public void updateSSTableIterated(int count)
-    {
-        sstablesPerReadHistogram.update(count);
-    }
-
-    /**
-     * Release all associated metrics.
-     */
-    public void release()
-    {
-        for(String name : all)
-        {
-            allColumnFamilyMetrics.get(name).remove(Metrics.getMetrics().get(factory.createMetricName(name).getMetricName()));
-            Metrics.remove(factory.createMetricName(name));
-        }
-        readLatency.release();
-        writeLatency.release();
-        rangeLatency.release();
-        Metrics.remove(factory.createMetricName("EstimatedRowSizeHistogram"));
-        Metrics.remove(factory.createMetricName("EstimatedRowCount"));
-        Metrics.remove(factory.createMetricName("EstimatedColumnCountHistogram"));
-        Metrics.remove(factory.createMetricName("KeyCacheHitRate"));
-        Metrics.remove(factory.createMetricName("CoordinatorReadLatency"));
-        Metrics.remove(factory.createMetricName("CoordinatorScanLatency"));
-        Metrics.remove(factory.createMetricName("WaitingOnFreeMemtableSpace"));
-    }
-
-
-    /**
-     * Create a gauge that will be part of a merged version of all column families.  The global gauge
-     * will merge each CF gauge by adding their values 
-     */
-    protected <T extends Number> Gauge<T> createColumnFamilyGauge(final String name, Gauge<T> gauge)
-    {
-        return createColumnFamilyGauge(name, gauge, new Gauge<Long>()
-        {
-            public Long getValue()
-            {
-                long total = 0;
-                for (Metric cfGauge : allColumnFamilyMetrics.get(name))
-                {
-                    total = total + ((Gauge<? extends Number>) cfGauge).getValue().longValue();
-                }
-                return total;
-            }
-        });
-    }
-    
-    /**
-     * Create a gauge that will be part of a merged version of all column families.  The global gauge
-     * is defined as the globalGauge parameter
-     */
-    protected <G,T> Gauge<T> createColumnFamilyGauge(String name, Gauge<T> gauge, Gauge<G> globalGauge)
-    {
-        Gauge<T> cfGauge = Metrics.register(factory.createMetricName(name), gauge);
-        if (register(name, cfGauge))
-        {
-            Metrics.register(globalNameFactory.createMetricName(name), globalGauge);
-        }
-        return cfGauge;
-    }
-    
-    /**
-     * Creates a counter that will also have a global counter thats the sum of all counters across 
-     * different column families
-     */
-    protected Counter createColumnFamilyCounter(final String name)
-    {
-        Counter cfCounter = Metrics.counter(factory.createMetricName(name));
-        if (register(name, cfCounter))
-        {
-            Metrics.register(globalNameFactory.createMetricName(name), new Gauge<Long>()
-            {
-                public Long getValue()
-                {
-                    long total = 0;
-                    for (Metric cfGauge : allColumnFamilyMetrics.get(name))
-                    {
-                        total += ((Counter) cfGauge).getCount();
-                    }
-                    return total;
-                }
-            });
-        }
-        return cfCounter;
-    }
-    
-    /**
-     * Create a histogram-like interface that will register both a CF, keyspace and global level
-     * histogram and forward any updates to both
-     */
-    protected ColumnFamilyHistogram createColumnFamilyHistogram(String name, Histogram keyspaceHistogram)
-    {
-        Histogram cfHistogram = Metrics.histogram(factory.createMetricName(name));
-        register(name, cfHistogram);
-        return new ColumnFamilyHistogram(cfHistogram, keyspaceHistogram, Metrics.histogram(globalNameFactory.createMetricName(name)));
-    }
-
-    /**
-     * Registers a metric to be removed when unloading CF.
-     * @return true if first time metric with that name has been registered
-     */
-    private boolean register(String name, Metric metric)
-    { 
-        boolean ret = allColumnFamilyMetrics.putIfAbsent(name,  new HashSet<Metric>()) == null;
-        allColumnFamilyMetrics.get(name).add(metric);
-        all.add(name);
-        return ret;
-    }
-    
-    public static class ColumnFamilyHistogram
-    {
-        public final Histogram[] all;
-        public final Histogram cf;
-        private ColumnFamilyHistogram(Histogram cf, Histogram keyspace, Histogram global)
-        {
-            this.cf = cf;
-            this.all = new Histogram[]{cf, keyspace, global};
-        }
-
-        public void update(long i)
-        {
-            for(Histogram histo : all)
-            {
-                histo.update(i);
-            }
-        }
-    }
-    
-    static class ColumnFamilyMetricNameFactory implements MetricNameFactory
-    {
-        private final String keyspaceName;
-        private final String columnFamilyName;
-        private final boolean isIndex;
-
-        ColumnFamilyMetricNameFactory(ColumnFamilyStore cfs)
-        {
-            this.keyspaceName = cfs.keyspace.getName();
-            this.columnFamilyName = cfs.name;
-            isIndex = cfs.isIndex();
-        }
-
-        public CassandraMetricsRegistry.MetricName createMetricName(String metricName)
-        {
-            String groupName = ColumnFamilyMetrics.class.getPackage().getName();
-            String type = isIndex ? "IndexColumnFamily" : "ColumnFamily";
-
-            StringBuilder mbeanName = new StringBuilder();
-            mbeanName.append(groupName).append(":");
-            mbeanName.append("type=").append(type);
-            mbeanName.append(",keyspace=").append(keyspaceName);
-            mbeanName.append(",scope=").append(columnFamilyName);
-            mbeanName.append(",name=").append(metricName);
-
-            return new CassandraMetricsRegistry.MetricName(groupName, type, metricName, keyspaceName + "." + columnFamilyName, mbeanName.toString());
-        }
-    }
-    
-    static class AllColumnFamilyMetricNameFactory implements MetricNameFactory
-    {
-        public CassandraMetricsRegistry.MetricName createMetricName(String metricName)
-        {
-            String groupName = ColumnFamilyMetrics.class.getPackage().getName(); 
-            StringBuilder mbeanName = new StringBuilder();
-            mbeanName.append(groupName).append(":");
-            mbeanName.append("type=ColumnFamily");
-            mbeanName.append(",name=").append(metricName);
-            return new CassandraMetricsRegistry.MetricName(groupName, "ColumnFamily", metricName, "all", mbeanName.toString());
-        }
-    }
-
-    public static enum Sampler
-    {
-        READS, WRITES
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index e9ce49c..8f52bb1 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -21,7 +21,6 @@ import java.util.Set;
 
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 
@@ -38,9 +37,9 @@ public class KeyspaceMetrics
 {
     /** Total amount of live data stored in the memtable, excluding any data structure overhead */
     public final Gauge<Long> memtableLiveDataSize;
-    /** Total amount of data stored in the memtable that resides on-heap, including column related overhead and overwritten rows. */
+    /** Total amount of data stored in the memtable that resides on-heap, including column related overhead and partitions overwritten. */
     public final Gauge<Long> memtableOnHeapDataSize;
-    /** Total amount of data stored in the memtable that resides off-heap, including column related overhead and overwritten rows. */
+    /** Total amount of data stored in the memtable that resides off-heap, including column related overhead and partitions overwritten. */
     public final Gauge<Long> memtableOffHeapDataSize;
     /** Total amount of live data stored in the memtables (2i and pending flush memtables included) that resides off-heap, excluding any data structure overhead */
     public final Gauge<Long> allMemtablesLiveDataSize;
@@ -106,121 +105,121 @@ public class KeyspaceMetrics
         keyspace = ks;
         memtableColumnsCount = createKeyspaceGauge("MemtableColumnsCount", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.memtableColumnsCount.getValue();
             }
         });
         memtableLiveDataSize = createKeyspaceGauge("MemtableLiveDataSize", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.memtableLiveDataSize.getValue();
             }
         }); 
         memtableOnHeapDataSize = createKeyspaceGauge("MemtableOnHeapDataSize", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.memtableOnHeapSize.getValue();
             }
         });
         memtableOffHeapDataSize = createKeyspaceGauge("MemtableOffHeapDataSize", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.memtableOffHeapSize.getValue();
             }
         });
         allMemtablesLiveDataSize = createKeyspaceGauge("AllMemtablesLiveDataSize", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.allMemtablesLiveDataSize.getValue();
             }
         });
         allMemtablesOnHeapDataSize = createKeyspaceGauge("AllMemtablesOnHeapDataSize", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.allMemtablesOnHeapSize.getValue();
             }
         });
         allMemtablesOffHeapDataSize = createKeyspaceGauge("AllMemtablesOffHeapDataSize", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.allMemtablesOffHeapSize.getValue();
             }
         });
         memtableSwitchCount = createKeyspaceGauge("MemtableSwitchCount", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.memtableSwitchCount.getCount();
             }
         });
         pendingCompactions = createKeyspaceGauge("PendingCompactions", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return (long) metric.pendingCompactions.getValue();
             }
         });
         pendingFlushes = createKeyspaceGauge("PendingFlushes", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return (long) metric.pendingFlushes.getCount();
             }
         });
         liveDiskSpaceUsed = createKeyspaceGauge("LiveDiskSpaceUsed", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.liveDiskSpaceUsed.getCount();
             }
         });
         totalDiskSpaceUsed = createKeyspaceGauge("TotalDiskSpaceUsed", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.totalDiskSpaceUsed.getCount();
             }
         });
         bloomFilterDiskSpaceUsed = createKeyspaceGauge("BloomFilterDiskSpaceUsed", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.bloomFilterDiskSpaceUsed.getValue();
             }
         });
         bloomFilterOffHeapMemoryUsed = createKeyspaceGauge("BloomFilterOffHeapMemoryUsed", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.bloomFilterOffHeapMemoryUsed.getValue();
             }
         });
         indexSummaryOffHeapMemoryUsed = createKeyspaceGauge("IndexSummaryOffHeapMemoryUsed", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.indexSummaryOffHeapMemoryUsed.getValue();
             }
         });
         compressionMetadataOffHeapMemoryUsed = createKeyspaceGauge("CompressionMetadataOffHeapMemoryUsed", new MetricValue()
         {
-            public Long getValue(ColumnFamilyMetrics metric)
+            public Long getValue(TableMetrics metric)
             {
                 return metric.compressionMetadataOffHeapMemoryUsed.getValue();
             }
         });
-        // latency metrics for ColumnFamilyMetrics to update
+        // latency metrics for TableMetrics to update
         readLatency = new LatencyMetrics(factory, "Read");
         writeLatency = new LatencyMetrics(factory, "Write");
         rangeLatency = new LatencyMetrics(factory, "Range");
-        // create histograms for ColumnFamilyMetrics to replicate updates to
+        // create histograms for TableMetrics to replicate updates to
         sstablesPerReadHistogram = Metrics.histogram(factory.createMetricName("SSTablesPerReadHistogram"));
         tombstoneScannedHistogram = Metrics.histogram(factory.createMetricName("TombstoneScannedHistogram"));
         liveScannedHistogram = Metrics.histogram(factory.createMetricName("LiveScannedHistogram"));
@@ -258,7 +257,7 @@ public class KeyspaceMetrics
          * @param metric of a column family in this keyspace
          * @return current value of a metric
          */
-        public Long getValue(ColumnFamilyMetrics metric);
+        public Long getValue(TableMetrics metric);
     }
 
     /**
@@ -295,7 +294,7 @@ public class KeyspaceMetrics
 
         public CassandraMetricsRegistry.MetricName createMetricName(String metricName)
         {
-            String groupName = ColumnFamilyMetrics.class.getPackage().getName();
+            String groupName = TableMetrics.class.getPackage().getName();
 
             StringBuilder mbeanName = new StringBuilder();
             mbeanName.append(groupName).append(":");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0bd5170c/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
index a2eef68..a1915b1 100644
--- a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
@@ -43,6 +43,7 @@ public class LatencyMetrics
     private List<LatencyMetrics> parents = Lists.newArrayList();
     
     protected final MetricNameFactory factory;
+    protected final MetricNameFactory aliasFactory;
     protected final String namePrefix;
 
     /**
@@ -76,11 +77,25 @@ public class LatencyMetrics
      */
     public LatencyMetrics(MetricNameFactory factory, String namePrefix)
     {
+        this(factory, null, namePrefix);
+    }
+
+    public LatencyMetrics(MetricNameFactory factory, MetricNameFactory aliasFactory, String namePrefix)
+    {
         this.factory = factory;
+        this.aliasFactory = aliasFactory;
         this.namePrefix = namePrefix;
 
-        latency = Metrics.timer(factory.createMetricName(namePrefix + "Latency"));
-        totalLatency = Metrics.counter(factory.createMetricName(namePrefix + "TotalLatency"));
+        if (aliasFactory == null)
+        {
+            latency = Metrics.timer(factory.createMetricName(namePrefix + "Latency"));
+            totalLatency = Metrics.counter(factory.createMetricName(namePrefix + "TotalLatency"));
+        }
+        else
+        {
+            latency = Metrics.timer(factory.createMetricName(namePrefix + "Latency"), aliasFactory.createMetricName(namePrefix + "Latency"));
+            totalLatency = Metrics.counter(factory.createMetricName(namePrefix + "TotalLatency"), aliasFactory.createMetricName(namePrefix + "TotalLatency"));
+        }
     }
     
     /**
@@ -93,7 +108,7 @@ public class LatencyMetrics
      */
     public LatencyMetrics(MetricNameFactory factory, String namePrefix, LatencyMetrics ... parents)
     {
-        this(factory, namePrefix);
+        this(factory, null, namePrefix);
         this.parents.addAll(ImmutableList.copyOf(parents));
     }
 
@@ -111,7 +126,15 @@ public class LatencyMetrics
 
     public void release()
     {
-        Metrics.remove(factory.createMetricName(namePrefix + "Latency"));
-        Metrics.remove(factory.createMetricName(namePrefix + "TotalLatency"));
+        if (aliasFactory == null)
+        {
+            Metrics.remove(factory.createMetricName(namePrefix + "Latency"));
+            Metrics.remove(factory.createMetricName(namePrefix + "TotalLatency"));
+        }
+        else
+        {
+            Metrics.remove(factory.createMetricName(namePrefix + "Latency"), aliasFactory.createMetricName(namePrefix + "Latency"));
+            Metrics.remove(factory.createMetricName(namePrefix + "TotalLatency"), aliasFactory.createMetricName(namePrefix + "TotalLatency"));
+        }
     }
 }