You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/08/30 14:48:36 UTC
[1/3] git commit: New metrics;
patch by yukim reviewed by brandonwilliams for CASSANDRA-4009
Updated Branches:
refs/heads/trunk 0525ae25f -> 69cedbfca
New metrics; patch by yukim reviewed by brandonwilliams for CASSANDRA-4009
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/69cedbfc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/69cedbfc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/69cedbfc
Branch: refs/heads/trunk
Commit: 69cedbfcaad18f40dcf97648c8adf129614054db
Parents: 4e6167d
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Aug 30 07:34:33 2012 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Aug 30 07:47:56 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cache/AutoSavingCache.java | 2 +-
.../apache/cassandra/cache/InstrumentingCache.java | 51 +--
.../cassandra/concurrent/IExecutorMBean.java | 4 +
.../concurrent/JMXEnabledThreadPoolExecutor.java | 26 +-
.../JMXEnabledThreadPoolExecutorMBean.java | 4 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 97 ++---
.../cassandra/db/ColumnFamilyStoreMBean.java | 76 ++++
src/java/org/apache/cassandra/db/DataTracker.java | 154 +------
.../apache/cassandra/db/commitlog/CommitLog.java | 14 +-
.../cassandra/db/commitlog/CommitLogMBean.java | 8 +-
.../cassandra/db/compaction/CompactionManager.java | 96 ++---
.../db/compaction/CompactionManagerMBean.java | 8 +
.../org/apache/cassandra/metrics/CacheMetrics.java | 108 +++++
.../cassandra/metrics/ClientRequestMetrics.java | 32 ++-
.../cassandra/metrics/ColumnFamilyMetrics.java | 341 +++++++++++++++
.../apache/cassandra/metrics/CommitLogMetrics.java | 66 +++
.../cassandra/metrics/CompactionMetrics.java | 107 +++++
.../cassandra/metrics/ConnectionMetrics.java | 132 ++++++
.../cassandra/metrics/DroppedMessageMetrics.java | 54 +++
.../apache/cassandra/metrics/LatencyMetrics.java | 142 ++++++
.../cassandra/metrics/MetricNameFactory.java | 31 ++
.../apache/cassandra/metrics/StorageMetrics.java | 30 ++
.../apache/cassandra/metrics/StreamingMetrics.java | 60 +++
.../cassandra/metrics/ThreadPoolMetrics.java | 116 +++++
.../org/apache/cassandra/net/MessagingService.java | 101 ++---
.../cassandra/net/OutboundTcpConnectionPool.java | 23 +
.../apache/cassandra/scheduler/WeightedQueue.java | 18 +-
.../cassandra/scheduler/WeightedQueueMBean.java | 2 +
.../org/apache/cassandra/service/CacheService.java | 33 +-
.../cassandra/service/CacheServiceMBean.java | 31 ++
.../org/apache/cassandra/service/StorageProxy.java | 50 ++-
.../cassandra/service/StorageProxyMBean.java | 30 ++
.../apache/cassandra/service/StorageService.java | 10 +-
.../cassandra/service/StorageServiceMBean.java | 2 +
.../apache/cassandra/streaming/FileStreamTask.java | 13 +-
.../cassandra/streaming/IncomingStreamReader.java | 7 +
.../compress/CompressedFileStreamTask.java | 9 +-
38 files changed, 1628 insertions(+), 461 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 271f5b7..9d09d57 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2-dev
+ * new metrics (CASSANDRA-4009)
* debug tracing (CASSANDRA-1123)
* parallelize row cache loading (CASSANDRA-4282)
* Make compaction, flush JBOD-aware (CASSANDRA-4292)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 2587619..d7c1cf7 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -59,7 +59,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
public AutoSavingCache(ICache<K, V> cache, CacheService.CacheType cacheType, CacheSerializer<K, V> cacheloader)
{
- super(cache);
+ super(cacheType.toString(), cache);
this.cacheType = cacheType;
this.cacheLoader = cacheloader;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/cache/InstrumentingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/InstrumentingCache.java b/src/java/org/apache/cassandra/cache/InstrumentingCache.java
index fda1c6b..9655e8d 100644
--- a/src/java/org/apache/cassandra/cache/InstrumentingCache.java
+++ b/src/java/org/apache/cassandra/cache/InstrumentingCache.java
@@ -18,23 +18,25 @@
package org.apache.cassandra.cache;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.metrics.CacheMetrics;
/**
* Wraps an ICache in requests + hits tracking.
*/
public class InstrumentingCache<K, V>
{
- private final AtomicLong requests = new AtomicLong(0);
- private final AtomicLong hits = new AtomicLong(0);
- private final AtomicLong lastRequests = new AtomicLong(0);
- private final AtomicLong lastHits = new AtomicLong(0);
private volatile boolean capacitySetManually;
private final ICache<K, V> map;
+ private final String type;
+
+ private CacheMetrics metrics;
- public InstrumentingCache(ICache<K, V> map)
+ public InstrumentingCache(String type, ICache<K, V> map)
{
this.map = map;
+ this.type = type;
+ this.metrics = new CacheMetrics(type, map);
}
public void put(K key, V value)
@@ -55,9 +57,9 @@ public class InstrumentingCache<K, V>
public V get(K key)
{
V v = map.get(key);
- requests.incrementAndGet();
+ metrics.requests.mark();
if (v != null)
- hits.incrementAndGet();
+ metrics.hits.mark();
return v;
}
@@ -102,36 +104,10 @@ public class InstrumentingCache<K, V>
return map.weightedSize();
}
- public long getHits()
- {
- return hits.get();
- }
-
- public long getRequests()
- {
- return requests.get();
- }
-
- public double getRecentHitRate()
- {
- long r = requests.get();
- long h = hits.get();
- try
- {
- return ((double)(h - lastHits.get())) / (r - lastRequests.get());
- }
- finally
- {
- lastRequests.set(r);
- lastHits.set(h);
- }
- }
-
public void clear()
{
map.clear();
- requests.set(0);
- hits.set(0);
+ metrics = new CacheMetrics(type, map);
}
public Set<K> getKeySet()
@@ -153,4 +129,9 @@ public class InstrumentingCache<K, V>
{
return map.isPutCopying();
}
+
+ public CacheMetrics getMetrics()
+ {
+ return metrics;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/concurrent/IExecutorMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/IExecutorMBean.java b/src/java/org/apache/cassandra/concurrent/IExecutorMBean.java
index 29292a7..f085b9c 100644
--- a/src/java/org/apache/cassandra/concurrent/IExecutorMBean.java
+++ b/src/java/org/apache/cassandra/concurrent/IExecutorMBean.java
@@ -17,6 +17,10 @@
*/
package org.apache.cassandra.concurrent;
+/**
+ * @see org.apache.cassandra.metrics.ThreadPoolMetrics
+ */
+@Deprecated
public interface IExecutorMBean
{
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
index f75a04b..52f9453 100644
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
@@ -19,13 +19,14 @@ package org.apache.cassandra.concurrent;
import java.lang.management.ManagementFactory;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.cassandra.metrics.ThreadPoolMetrics;
+
/**
* This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
* for the <i>afterExecute()</i> found in the <i>ThreadPoolExecutor</i> class to log any unexpected
@@ -35,9 +36,7 @@ import javax.management.ObjectName;
public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor implements JMXEnabledThreadPoolExecutorMBean
{
private final String mbeanName;
-
- private final AtomicInteger totalBlocked = new AtomicInteger(0);
- private final AtomicInteger currentBlocked = new AtomicInteger(0);
+ private final ThreadPoolMetrics metrics;
public JMXEnabledThreadPoolExecutor(String threadPoolName)
{
@@ -75,8 +74,11 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory);
super.prestartAllCoreThreads();
+ metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id);
+
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id;
+
try
{
mbs.registerMBean(this, new ObjectName(mbeanName));
@@ -102,6 +104,9 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
{
throw new RuntimeException(e);
}
+
+ // release metrics
+ metrics.release();
}
@Override
@@ -146,31 +151,30 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
public int getTotalBlockedTasks()
{
- return totalBlocked.get();
+ return (int) metrics.totalBlocked.count();
}
public int getCurrentlyBlockedTasks()
{
- return currentBlocked.get();
+ return (int) metrics.currentBlocked.count();
}
@Override
protected void onInitialRejection(Runnable task)
{
- totalBlocked.incrementAndGet();
- currentBlocked.incrementAndGet();
+ metrics.totalBlocked.inc();
+ metrics.currentBlocked.inc();
}
@Override
protected void onFinalAccept(Runnable task)
{
- currentBlocked.decrementAndGet();
+ metrics.currentBlocked.dec();
}
@Override
protected void onFinalRejection(Runnable task)
{
- currentBlocked.decrementAndGet();
+ metrics.currentBlocked.dec();
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
index fbed2c4..48136bc 100644
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutorMBean.java
@@ -17,6 +17,10 @@
*/
package org.apache.cassandra.concurrent;
+/**
+ * @see org.apache.cassandra.metrics.ThreadPoolMetrics
+ */
+@Deprecated
public interface JMXEnabledThreadPoolExecutorMBean extends IExecutorMBean
{
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ef0e55d..464bac6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.IndexExpression;
@@ -112,20 +113,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
/* Memtables and SSTables on disk for this column family */
private final DataTracker data;
- private volatile int memtableSwitchCount = 0;
-
/* This is used to generate the next index for a SSTable */
private final AtomicInteger fileIndexGenerator = new AtomicInteger(0);
public final SecondaryIndexManager indexManager;
- private final LatencyTracker readStats = new LatencyTracker();
- private final LatencyTracker writeStats = new LatencyTracker();
-
- // counts of sstables accessed by reads
- private final EstimatedHistogram recentSSTablesPerRead = new EstimatedHistogram(35);
- private final EstimatedHistogram sstablesPerRead = new EstimatedHistogram(35);
-
private static final int INTERN_CUTOFF = 256;
public final ConcurrentMap<ByteBuffer, ByteBuffer> internedNames = new NonBlockingHashMap<ByteBuffer, ByteBuffer>();
@@ -141,6 +133,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
/** ops count last time we computed liveRatio */
private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
+ public final ColumnFamilyMetrics metric;
+
public void reload()
{
// metadata object has been mutated directly. make all the members jibe with new settings.
@@ -216,6 +210,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
this.partitioner = partitioner;
this.directories = directories;
this.indexManager = new SecondaryIndexManager(this);
+ this.metric = new ColumnFamilyMetrics(this);
fileIndexGenerator.set(generation);
Caching caching = metadata.getCaching();
@@ -308,21 +303,24 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
ObjectName nameObj = new ObjectName(mbeanName);
if (mbs.isRegistered(nameObj))
mbs.unregisterMBean(nameObj);
+
+ // unregister metrics
+ metric.release();
}
public long getMinRowSize()
{
- return data.getMinRowSize();
+ return metric.minRowSize.value();
}
public long getMaxRowSize()
{
- return data.getMaxRowSize();
+ return metric.maxRowSize.value();
}
public long getMeanRowSize()
{
- return data.getMeanRowSize();
+ return metric.meanRowSize.value();
}
public int getMeanColumns()
@@ -615,9 +613,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
memtable.flushAndSignal(latch, flushWriter, ctx);
}
- if (memtableSwitchCount == Integer.MAX_VALUE)
- memtableSwitchCount = 0;
- memtableSwitchCount++;
+ if (metric.memtableSwitchCount.count() == Long.MAX_VALUE)
+ metric.memtableSwitchCount.clear();
+ metric.memtableSwitchCount.inc();
// when all the memtables have been written, including for indexes, mark the flush in the commitlog header.
// a second executor makes sure the onMemtableFlushes get called in the right order,
@@ -721,13 +719,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Memtable mt = getMemtableThreadSafe();
mt.put(key, columnFamily);
updateRowCache(key, columnFamily);
- writeStats.addNano(System.nanoTime() - start);
+ metric.writeLatency.addNano(System.nanoTime() - start);
// recompute liveRatio, if we have doubled the number of ops since last calculated
while (true)
{
long last = liveRatioComputedAt.get();
- long operations = writeStats.getOpCount();
+ long operations = metric.writeLatency.latency.count();
if (operations < 2 * last)
break;
if (liveRatioComputedAt.compareAndSet(last, operations))
@@ -955,12 +953,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public long getMemtableColumnsCount()
{
- return getMemtableThreadSafe().getOperations();
+ return metric.memtableColumnsCount.value();
}
public long getMemtableDataSize()
{
- return getMemtableThreadSafe().getLiveSize();
+ return metric.memtableDataSize.value();
}
public long getTotalMemtableLiveSize()
@@ -970,7 +968,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public int getMemtableSwitchCount()
{
- return memtableSwitchCount;
+ return (int) metric.memtableSwitchCount.count();
}
/**
@@ -1008,68 +1006,67 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public long[] getRecentSSTablesPerReadHistogram()
{
- return recentSSTablesPerRead.getBuckets(true);
+ return metric.recentSSTablesPerRead.getBuckets(true);
}
public long[] getSSTablesPerReadHistogram()
{
- return sstablesPerRead.getBuckets(false);
+ return metric.sstablesPerRead.getBuckets(false);
}
public long getReadCount()
{
- return readStats.getOpCount();
+ return metric.readLatency.latency.count();
}
public double getRecentReadLatencyMicros()
{
- return readStats.getRecentLatencyMicros();
+ return metric.readLatency.getRecentLatency();
}
public long[] getLifetimeReadLatencyHistogramMicros()
{
- return readStats.getTotalLatencyHistogramMicros();
+ return metric.readLatency.totalLatencyHistogram.getBuckets(false);
}
public long[] getRecentReadLatencyHistogramMicros()
{
- return readStats.getRecentLatencyHistogramMicros();
+ return metric.readLatency.recentLatencyHistogram.getBuckets(true);
}
public long getTotalReadLatencyMicros()
{
- return readStats.getTotalLatencyMicros();
+ return metric.readLatency.totalLatency.count();
}
-// TODO this actually isn't a good meature of pending tasks
public int getPendingTasks()
{
- return Table.switchLock.getQueueLength();
+ return metric.pendingTasks.value();
}
public long getWriteCount()
{
- return writeStats.getOpCount();
+ return metric.writeLatency.latency.count();
}
public long getTotalWriteLatencyMicros()
{
- return writeStats.getTotalLatencyMicros();
+ return metric.writeLatency.totalLatency.count();
}
public double getRecentWriteLatencyMicros()
{
- return writeStats.getRecentLatencyMicros();
+ return metric.writeLatency.getRecentLatency();
}
public long[] getLifetimeWriteLatencyHistogramMicros()
{
- return writeStats.getTotalLatencyHistogramMicros();
+ return metric.writeLatency.totalLatencyHistogram.getBuckets(false);
}
public long[] getRecentWriteLatencyHistogramMicros()
{
- return writeStats.getRecentLatencyHistogramMicros();
+ return metric.writeLatency.recentLatencyHistogram.getBuckets(true);
}
public ColumnFamily getColumnFamily(DecoratedKey key, QueryPath path, ByteBuffer start, ByteBuffer finish, boolean reversed, int limit)
@@ -1189,7 +1186,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
finally
{
- readStats.addNano(System.nanoTime() - start);
+ metric.readLatency.addNano(System.nanoTime() - start);
}
logger.debug("Read {} columns", result == null ? 0 : result.getColumnCount());
@@ -1317,8 +1314,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
CollationController controller = new CollationController(this, forCache, filter, gcBefore);
ColumnFamily columns = controller.getTopLevelColumns();
- recentSSTablesPerRead.add(controller.getSstablesIterated());
- sstablesPerRead.add(controller.getSstablesIterated());
+ metric.updateSSTableIterated(controller.getSstablesIterated());
return columns;
}
@@ -1548,22 +1544,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public boolean hasUnreclaimedSpace()
{
- return data.getLiveSize() < data.getTotalSize();
+ return getLiveDiskSpaceUsed() < getTotalDiskSpaceUsed();
}
public long getTotalDiskSpaceUsed()
{
- return data.getTotalSize();
+ return metric.totalDiskSpaceUsed.count();
}
public long getLiveDiskSpaceUsed()
{
- return data.getLiveSize();
+ return metric.liveDiskSpaceUsed.count();
}
public int getLiveSSTableCount()
{
- return data.getSSTables().size();
+ return metric.liveSSTableCount.value();
}
/**
@@ -1777,30 +1773,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public long getBloomFilterFalsePositives()
{
- return data.getBloomFilterFalsePositives();
+ return metric.bloomFilterFalsePositives.value();
}
public long getRecentBloomFilterFalsePositives()
{
- return data.getRecentBloomFilterFalsePositives();
+ return metric.recentBloomFilterFalsePositives.value();
}
public double getBloomFilterFalseRatio()
{
- return data.getBloomFilterFalseRatio();
+ return metric.bloomFilterFalseRatio.value();
}
public double getRecentBloomFilterFalseRatio()
{
- return data.getRecentBloomFilterFalseRatio();
+ return metric.recentBloomFilterFalseRatio.value();
}
public long getBloomFilterDiskSpaceUsed()
{
- long total = 0;
- for (SSTableReader sst : getSSTables())
- total += sst.getBloomFilterSerializedSize();
- return total;
+ return metric.bloomFilterDiskSpaceUsed.value();
}
@Override
@@ -1887,17 +1880,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public long[] getEstimatedRowSizeHistogram()
{
- return data.getEstimatedRowSizeHistogram();
+ return metric.estimatedRowSizeHistogram.value();
}
public long[] getEstimatedColumnCountHistogram()
{
- return data.getEstimatedColumnCountHistogram();
+ return metric.estimatedColumnCountHistogram.value();
}
public double getCompressionRatio()
{
- return data.getCompressionRatio();
+ return metric.compressionRatio.value();
}
/** true if this CFS contains secondary index data */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 20baae9..8e7fd9d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -37,103 +37,141 @@ public interface ColumnFamilyStoreMBean
* Returns the total amount of data stored in the memtable, including
* column related overhead.
*
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#memtableDataSize
* @return The size in bytes.
*/
+ @Deprecated
public long getMemtableDataSize();
/**
* Returns the total number of columns present in the memtable.
*
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#memtableColumnsCount
* @return The number of columns.
*/
+ @Deprecated
public long getMemtableColumnsCount();
/**
* Returns the number of times that a flush has resulted in the
* memtable being switched out.
*
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#memtableSwitchCount
* @return the number of memtable switches
*/
+ @Deprecated
public int getMemtableSwitchCount();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#recentSSTablesPerReadHistogram
* @return a histogram of the number of sstable data files accessed per read: reading this property resets it
*/
+ @Deprecated
public long[] getRecentSSTablesPerReadHistogram();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#sstablesPerReadHistogram
* @return a histogram of the number of sstable data files accessed per read
*/
+ @Deprecated
public long[] getSSTablesPerReadHistogram();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#readLatency
* @return the number of read operations on this column family
*/
+ @Deprecated
public long getReadCount();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#readLatency
* @return total read latency (divide by getReadCount() for average)
*/
+ @Deprecated
public long getTotalReadLatencyMicros();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#readLatency
* @return an array representing the latency histogram
*/
+ @Deprecated
public long[] getLifetimeReadLatencyHistogramMicros();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#readLatency
* @return an array representing the latency histogram
*/
+ @Deprecated
public long[] getRecentReadLatencyHistogramMicros();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#readLatency
* @return average latency per read operation since the last call
*/
+ @Deprecated
public double getRecentReadLatencyMicros();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#writeLatency
* @return the number of write operations on this column family
*/
+ @Deprecated
public long getWriteCount();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#writeLatency
* @return total write latency (divide by getReadCount() for average)
*/
+ @Deprecated
public long getTotalWriteLatencyMicros();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#writeLatency
* @return an array representing the latency histogram
*/
+ @Deprecated
public long[] getLifetimeWriteLatencyHistogramMicros();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#writeLatency
* @return an array representing the latency histogram
*/
+ @Deprecated
public long[] getRecentWriteLatencyHistogramMicros();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#writeLatency
* @return average latency per write operation since the last call
*/
+ @Deprecated
public double getRecentWriteLatencyMicros();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#pendingTasks
* @return the estimated number of tasks pending for this column family
*/
+ @Deprecated
public int getPendingTasks();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#liveSSTableCount
* @return the number of SSTables on disk for this CF
*/
+ @Deprecated
public int getLiveSSTableCount();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#liveDiskSpaceUsed
* @return disk space used by SSTables belonging to this CF
*/
+ @Deprecated
public long getLiveDiskSpaceUsed();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#totalDiskSpaceUsed
* @return total disk space used by SSTables belonging to this CF, including obsolete ones waiting to be GC'd
*/
+ @Deprecated
public long getTotalDiskSpaceUsed();
/**
@@ -142,28 +180,54 @@ public interface ColumnFamilyStoreMBean
public void forceMajorCompaction() throws ExecutionException, InterruptedException;
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#minRowSize
* @return the size of the smallest compacted row
*/
+ @Deprecated
public long getMinRowSize();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#maxRowSize
* @return the size of the largest compacted row
*/
+ @Deprecated
public long getMaxRowSize();
/**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#meanRowSize
* @return the size of the smallest compacted row
*/
+ @Deprecated
public long getMeanRowSize();
+ /**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#bloomFilterFalsePositives
+ */
+ @Deprecated
public long getBloomFilterFalsePositives();
+ /**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#recentBloomFilterFalsePositives
+ */
+ @Deprecated
public long getRecentBloomFilterFalsePositives();
+ /**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#bloomFilterFalseRatio
+ */
+ @Deprecated
public double getBloomFilterFalseRatio();
+ /**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#recentBloomFilterFalseRatio
+ */
+ @Deprecated
public double getRecentBloomFilterFalseRatio();
+ /**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#bloomFilterDiskSpaceUsed
+ */
+ @Deprecated
public long getBloomFilterDiskSpaceUsed();
/**
@@ -220,8 +284,20 @@ public interface ColumnFamilyStoreMBean
public long estimateKeys();
+ /**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#estimatedRowSizeHistogram
+ */
+ @Deprecated
public long[] getEstimatedRowSizeHistogram();
+ /**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#estimatedColumnCountHistogram
+ */
+ @Deprecated
public long[] getEstimatedColumnCountHistogram();
+ /**
+ * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#compressionRatio
+ */
+ @Deprecated
public double getCompressionRatio();
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 8233fbe..1e7b1bf 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
import java.io.File;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.*;
@@ -31,6 +30,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.notifications.INotification;
import org.apache.cassandra.notifications.INotificationConsumer;
import org.apache.cassandra.notifications.SSTableAddedNotification;
@@ -47,10 +47,6 @@ public class DataTracker
public final ColumnFamilyStore cfstore;
private final AtomicReference<View> view;
- // On disk live and total size
- private final AtomicLong liveSize = new AtomicLong();
- private final AtomicLong totalSize = new AtomicLong();
-
public DataTracker(ColumnFamilyStore cfstore)
{
this.cfstore = cfstore;
@@ -360,8 +356,9 @@ public class DataTracker
logger.debug(String.format("adding %s to list of files tracked for %s.%s",
sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));
long size = sstable.bytesOnDisk();
- liveSize.addAndGet(size);
- totalSize.addAndGet(size);
+ StorageMetrics.load.inc(size);
+ cfstore.metric.liveDiskSpaceUsed.inc(size);
+ cfstore.metric.totalDiskSpaceUsed.inc(size);
sstable.setTrackedBy(this);
}
}
@@ -373,26 +370,18 @@ public class DataTracker
if (logger.isDebugEnabled())
logger.debug(String.format("removing %s from list of files tracked for %s.%s",
sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));
- liveSize.addAndGet(-sstable.bytesOnDisk());
+ long size = sstable.bytesOnDisk();
+ StorageMetrics.load.dec(size);
+ cfstore.metric.liveDiskSpaceUsed.dec(size);
boolean firstToCompact = sstable.markCompacted();
assert firstToCompact : sstable + " was already marked compacted";
sstable.releaseReference();
}
}
- public long getLiveSize()
- {
- return liveSize.get();
- }
-
- public long getTotalSize()
- {
- return totalSize.get();
- }
-
public void spaceReclaimed(long size)
{
- totalSize.addAndGet(-size);
+ cfstore.metric.totalDiskSpaceUsed.dec(size);
}
public long estimatedKeys()
@@ -405,85 +394,6 @@ public class DataTracker
return n;
}
- public long[] getEstimatedRowSizeHistogram()
- {
- long[] histogram = new long[90];
-
- for (SSTableReader sstable : getSSTables())
- {
- long[] rowSize = sstable.getEstimatedRowSize().getBuckets(false);
-
- for (int i = 0; i < histogram.length; i++)
- histogram[i] += rowSize[i];
- }
-
- return histogram;
- }
-
- public long[] getEstimatedColumnCountHistogram()
- {
- long[] histogram = new long[90];
-
- for (SSTableReader sstable : getSSTables())
- {
- long[] columnSize = sstable.getEstimatedColumnCount().getBuckets(false);
-
- for (int i = 0; i < histogram.length; i++)
- histogram[i] += columnSize[i];
- }
-
- return histogram;
- }
-
- public double getCompressionRatio()
- {
- double sum = 0;
- int total = 0;
- for (SSTableReader sstable : getSSTables())
- {
- if (sstable.getCompressionRatio() != Double.MIN_VALUE)
- {
- sum += sstable.getCompressionRatio();
- total++;
- }
- }
- return total != 0 ? (double)sum/total: 0;
- }
-
- public long getMinRowSize()
- {
- long min = 0;
- for (SSTableReader sstable : getSSTables())
- {
- if (min == 0 || sstable.getEstimatedRowSize().min() < min)
- min = sstable.getEstimatedRowSize().min();
- }
- return min;
- }
-
- public long getMaxRowSize()
- {
- long max = 0;
- for (SSTableReader sstable : getSSTables())
- {
- if (sstable.getEstimatedRowSize().max() > max)
- max = sstable.getEstimatedRowSize().max();
- }
- return max;
- }
-
- public long getMeanRowSize()
- {
- long sum = 0;
- long count = 0;
- for (SSTableReader sstable : getSSTables())
- {
- sum += sstable.getEstimatedRowSize().mean();
- count++;
- }
- return count > 0 ? sum / count : 0;
- }
-
public int getMeanColumns()
{
long sum = 0;
@@ -496,54 +406,6 @@ public class DataTracker
return count > 0 ? (int) (sum / count) : 0;
}
- public long getBloomFilterFalsePositives()
- {
- long count = 0L;
- for (SSTableReader sstable: getSSTables())
- {
- count += sstable.getBloomFilterFalsePositiveCount();
- }
- return count;
- }
-
- public long getRecentBloomFilterFalsePositives()
- {
- long count = 0L;
- for (SSTableReader sstable: getSSTables())
- {
- count += sstable.getRecentBloomFilterFalsePositiveCount();
- }
- return count;
- }
-
- public double getBloomFilterFalseRatio()
- {
- long falseCount = 0L;
- long trueCount = 0L;
- for (SSTableReader sstable: getSSTables())
- {
- falseCount += sstable.getBloomFilterFalsePositiveCount();
- trueCount += sstable.getBloomFilterTruePositiveCount();
- }
- if (falseCount == 0L && trueCount == 0L)
- return 0d;
- return (double) falseCount / (trueCount + falseCount);
- }
-
- public double getRecentBloomFilterFalseRatio()
- {
- long falseCount = 0L;
- long trueCount = 0L;
- for (SSTableReader sstable: getSSTables())
- {
- falseCount += sstable.getRecentBloomFilterFalsePositiveCount();
- trueCount += sstable.getRecentBloomFilterTruePositiveCount();
- }
- if (falseCount == 0L && trueCount == 0L)
- return 0d;
- return (double) falseCount / (trueCount + falseCount);
- }
-
public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)
{
for (INotificationConsumer subscriber : subscribers)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index e3f3c13..22abcb7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
/*
@@ -58,6 +59,8 @@ public class CommitLog implements CommitLogMBean
public CommitLogSegment activeSegment;
+ private final CommitLogMetrics metrics;
+
private CommitLog()
{
DatabaseDescriptor.createAllDirectories();
@@ -78,6 +81,9 @@ public class CommitLog implements CommitLogMBean
{
throw new RuntimeException(e);
}
+
+ // register metrics
+ metrics = new CommitLogMetrics(executor, allocator);
}
/**
@@ -272,7 +278,7 @@ public class CommitLog implements CommitLogMBean
*/
public long getCompletedTasks()
{
- return executor.getCompletedTasks();
+ return metrics.completedTasks.value();
}
/**
@@ -280,7 +286,7 @@ public class CommitLog implements CommitLogMBean
*/
public long getPendingTasks()
{
- return executor.getPendingTasks();
+ return metrics.pendingTasks.value();
}
/**
@@ -288,7 +294,7 @@ public class CommitLog implements CommitLogMBean
*/
public long getTotalCommitlogSize()
{
- return allocator.bytesUsed();
+ return metrics.totalCommitLogSize.value();
}
/**
@@ -330,7 +336,7 @@ public class CommitLog implements CommitLogMBean
segmentNames.add(segment.getName());
return segmentNames;
}
-
+
public List<String> getArchivingSegmentNames()
{
return new ArrayList<String>(archiver.archivePending.keySet());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
index 3570dc4..6c0d8d7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
@@ -25,17 +25,23 @@ public interface CommitLogMBean
{
/**
* Get the number of completed tasks
+ * @see org.apache.cassandra.metrics.CommitLogMetrics#completedTasks
*/
+ @Deprecated
public long getCompletedTasks();
/**
* Get the number of tasks waiting to be executed
+ * @see org.apache.cassandra.metrics.CommitLogMetrics#pendingTasks
*/
+ @Deprecated
public long getPendingTasks();
/**
* Get the current size used by all the commitlog segments.
+ * @see org.apache.cassandra.metrics.CommitLogMetrics#totalCommitLogSize
*/
+ @Deprecated
public long getTotalCommitlogSize();
/**
@@ -52,4 +58,4 @@ public interface CommitLogMBean
* @return Files which are pending for archival attempt. Does NOT include failed archive attempts.
*/
public List<String> getArchivingSegmentNames();
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5526dc1..7474191 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
@@ -99,6 +100,7 @@ public class CompactionManager implements CompactionManagerMBean
private final CompactionExecutor executor = new CompactionExecutor();
private final CompactionExecutor validationExecutor = new ValidationExecutor();
+ private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor);
/**
* @return A lock, for which acquisition means no compactions can run.
@@ -148,7 +150,7 @@ public class CompactionManager implements CompactionManagerMBean
try
{
- task.execute(executor);
+ task.execute(metrics);
}
finally
{
@@ -238,7 +240,7 @@ public class CompactionManager implements CompactionManagerMBean
CompactionTask task = new CompactionTask(cfs, Collections.singletonList(sstable), NO_GC);
task.isUserDefined(true);
task.setCompactionType(OperationType.UPGRADE_SSTABLES);
- task.execute(executor);
+ task.execute(metrics);
}
}
});
@@ -293,7 +295,7 @@ public class CompactionManager implements CompactionManagerMBean
compactionLock.writeLock().unlock();
try
{
- task.execute(executor);
+ task.execute(metrics);
}
finally
{
@@ -390,7 +392,7 @@ public class CompactionManager implements CompactionManagerMBean
{
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
AbstractCompactionTask task = strategy.getUserDefinedTask(toCompact, gcBefore);
- task.execute(executor);
+ task.execute(metrics);
}
finally
{
@@ -488,7 +490,7 @@ public class CompactionManager implements CompactionManagerMBean
Scrubber scrubber = new Scrubber(cfs, sstable);
CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
- executor.beginCompaction(scrubInfo);
+ metrics.beginCompaction(scrubInfo);
try
{
scrubber.scrub();
@@ -496,7 +498,7 @@ public class CompactionManager implements CompactionManagerMBean
finally
{
scrubber.close();
- executor.finishCompaction(scrubInfo);
+ metrics.finishCompaction(scrubInfo);
}
if (scrubber.getNewInOrderSSTable() != null)
@@ -561,7 +563,7 @@ public class CompactionManager implements CompactionManagerMBean
List<IColumn> indexedColumnsInRow = null;
CleanupInfo ci = new CleanupInfo(sstable, scanner);
- executor.beginCompaction(ci);
+ metrics.beginCompaction(ci);
try
{
while (scanner.hasNext())
@@ -631,7 +633,7 @@ public class CompactionManager implements CompactionManagerMBean
finally
{
scanner.close();
- executor.finishCompaction(ci);
+ metrics.finishCompaction(ci);
}
List<SSTableReader> results = new ArrayList<SSTableReader>(1);
@@ -711,7 +713,7 @@ public class CompactionManager implements CompactionManagerMBean
CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.request.range);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
- validationExecutor.beginCompaction(ci);
+ metrics.beginCompaction(ci);
try
{
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
@@ -734,7 +736,7 @@ public class CompactionManager implements CompactionManagerMBean
if (cfs.table.snapshotExists(validator.request.sessionid))
cfs.table.clearSnapshot(validator.request.sessionid);
- validationExecutor.finishCompaction(ci);
+ metrics.finishCompaction(ci);
}
}
@@ -750,14 +752,14 @@ public class CompactionManager implements CompactionManagerMBean
compactionLock.readLock().lock();
try
{
- executor.beginCompaction(builder);
+ metrics.beginCompaction(builder);
try
{
builder.build();
}
finally
{
- executor.finishCompaction(builder);
+ metrics.finishCompaction(builder);
}
}
finally
@@ -790,14 +792,14 @@ public class CompactionManager implements CompactionManagerMBean
}
try
{
- executor.beginCompaction(writer);
+ metrics.beginCompaction(writer);
try
{
writer.saveCache();
}
finally
{
- executor.finishCompaction(writer);
+ metrics.finishCompaction(writer);
}
}
finally
@@ -873,15 +875,11 @@ public class CompactionManager implements CompactionManagerMBean
public int getActiveCompactions()
{
- return CompactionExecutor.compactions.size();
+ return CompactionMetrics.getCompactions().size();
}
- private static class CompactionExecutor extends ThreadPoolExecutor implements CompactionExecutorStatsCollector
+ private static class CompactionExecutor extends ThreadPoolExecutor
{
- // a synchronized identity set of running tasks to their compaction info
- private static final Set<CompactionInfo.Holder> compactions = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<CompactionInfo.Holder, Boolean>()));
- private volatile long totalBytesCompacted = 0L;
- private volatile long totalCompactionsCompleted = 0L;
protected CompactionExecutor(int minThreads, int maxThreads, String name, BlockingQueue<Runnable> queue)
{
@@ -899,40 +897,6 @@ public class CompactionManager implements CompactionManagerMBean
this(Math.max(1, DatabaseDescriptor.getConcurrentCompactors()), "CompactionExecutor");
}
- public void beginCompaction(CompactionInfo.Holder ci)
- {
- // notify
- ci.started();
- compactions.add(ci);
- }
-
- public void finishCompaction(CompactionInfo.Holder ci)
- {
- // notify
- ci.finished();
- compactions.remove(ci);
- totalBytesCompacted += ci.getCompactionInfo().getTotal();
- totalCompactionsCompleted += 1;
- }
-
- public static List<CompactionInfo.Holder> getCompactions()
- {
- return new ArrayList<CompactionInfo.Holder>(compactions);
- }
-
- public long getTotalBytesCompacted()
- {
- long bytesCompletedInProgress = 0L;
- for (CompactionInfo.Holder ci : compactions)
- bytesCompletedInProgress += ci.getCompactionInfo().getCompleted();
- return bytesCompletedInProgress + totalBytesCompacted;
- }
-
- public long getTotalCompactionsCompleted()
- {
- return totalCompactionsCompleted;
- }
-
// modified from DebuggableThreadPoolExecutor so that CompactionInterruptedExceptions are not logged
@Override
public void afterExecute(Runnable r, Throwable t)
@@ -973,7 +937,7 @@ public class CompactionManager implements CompactionManagerMBean
public List<Map<String, String>> getCompactions()
{
- List<Holder> compactionHolders = CompactionExecutor.getCompactions();
+ List<Holder> compactionHolders = CompactionMetrics.getCompactions();
List<Map<String, String>> out = new ArrayList<Map<String, String>>(compactionHolders.size());
for (CompactionInfo.Holder ci : compactionHolders)
out.add(ci.getCompactionInfo().asMap());
@@ -982,7 +946,7 @@ public class CompactionManager implements CompactionManagerMBean
public List<String> getCompactionSummary()
{
- List<Holder> compactionHolders = CompactionExecutor.getCompactions();
+ List<Holder> compactionHolders = CompactionMetrics.getCompactions();
List<String> out = new ArrayList<String>(compactionHolders.size());
for (CompactionInfo.Holder ci : compactionHolders)
out.add(ci.getCompactionInfo().toString());
@@ -991,30 +955,22 @@ public class CompactionManager implements CompactionManagerMBean
public long getTotalBytesCompacted()
{
- return executor.getTotalBytesCompacted() + validationExecutor.getTotalBytesCompacted();
+ return metrics.bytesCompacted.count();
}
public long getTotalCompactionsCompleted()
{
- return executor.getTotalCompactionsCompleted() + validationExecutor.getTotalCompactionsCompleted();
+ return metrics.totalCompactionsCompleted.count();
}
public int getPendingTasks()
{
- int n = 0;
- for (String tableName : Schema.instance.getTables())
- {
- for (ColumnFamilyStore cfs : Table.open(tableName).getColumnFamilyStores())
- {
- n += cfs.getCompactionStrategy().getEstimatedRemainingTasks();
- }
- }
- return (int) (executor.getTaskCount() + validationExecutor.getTaskCount() - executor.getCompletedTaskCount() - validationExecutor.getCompletedTaskCount()) + n;
+ return metrics.pendingTasks.value();
}
public long getCompletedTasks()
{
- return executor.getCompletedTaskCount() + validationExecutor.getCompletedTaskCount();
+ return metrics.completedTasks.value();
}
private static class SimpleFuture implements Future
@@ -1083,7 +1039,7 @@ public class CompactionManager implements CompactionManagerMBean
public void stopCompaction(String type)
{
OperationType operation = OperationType.valueOf(type);
- for (Holder holder : CompactionExecutor.getCompactions())
+ for (Holder holder : CompactionMetrics.getCompactions())
{
if (holder.getCompactionInfo().getTaskType() == operation)
holder.stop();
@@ -1100,7 +1056,7 @@ public class CompactionManager implements CompactionManagerMBean
{
assert columnFamilies != null;
- for (Holder compactionHolder : CompactionExecutor.getCompactions())
+ for (Holder compactionHolder : CompactionMetrics.getCompactions())
{
CompactionInfo info = compactionHolder.getCompactionInfo();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index 14fad26..5f1a156 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -29,23 +29,31 @@ public interface CompactionManagerMBean
public List<String> getCompactionSummary();
/**
+ * @see org.apache.cassandra.metrics.CompactionMetrics#pendingTasks
* @return estimated number of compactions remaining to perform
*/
+ @Deprecated
public int getPendingTasks();
/**
+ * @see org.apache.cassandra.metrics.CompactionMetrics#completedTasks
* @return number of completed compactions since server [re]start
*/
+ @Deprecated
public long getCompletedTasks();
/**
+ * @see org.apache.cassandra.metrics.CompactionMetrics#totalBytesCompacted
* @return total number of bytes compacted since server [re]start
*/
+ @Deprecated
public long getTotalBytesCompacted();
/**
+ * @see org.apache.cassandra.metrics.CompactionMetrics#totalCompactionsCompleted
* @return total number of compactions since server [re]start
*/
+ @Deprecated
public long getTotalCompactionsCompleted();
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/CacheMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CacheMetrics.java b/src/java/org/apache/cassandra/metrics/CacheMetrics.java
new file mode 100644
index 0000000..dee9319
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/CacheMetrics.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.util.RatioGauge;
+
+import org.apache.cassandra.cache.ICache;
+
+/**
+ * Metrics for {@code ICache}.
+ */
+public class CacheMetrics
+{
+ public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+ public static final String TYPE_NAME = "Cache";
+
+ /** Cache capacity in bytes */
+ public final Gauge<Long> capacityInBytes;
+ /** Total number of cache hits */
+ public final Meter hits;
+ /** Total number of cache requests */
+ public final Meter requests;
+ /** cache hit rate */
+ public final Gauge<Double> hitRate;
+ /** Total size of cache */
+ public final Gauge<Long> size;
+
+ private final AtomicLong lastRequests = new AtomicLong(0);
+ private final AtomicLong lastHits = new AtomicLong(0);
+
+ /**
+ * Create metrics for given cache.
+ *
+ * @param type Type of Cache to identify metrics.
+ * @param cache Cache to measure metrics
+ */
+ public CacheMetrics(String type, final ICache cache)
+ {
+ capacityInBytes = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CapacityInBytes", type), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return cache.capacity();
+ }
+ });
+ hits = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "Hits", type), "hits", TimeUnit.SECONDS);
+ requests = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "Requests", type), "requests", TimeUnit.SECONDS);
+ hitRate = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "HitRate", type), new RatioGauge()
+ {
+ protected double getNumerator()
+ {
+ return hits.count();
+ }
+
+ protected double getDenominator()
+ {
+ return requests.count();
+ }
+ });
+ size = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "Size", type), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return cache.weightedSize();
+ }
+ });
+ }
+
+ // for backward compatibility
+ @Deprecated
+ public double getRecentHitRate()
+ {
+ long r = requests.count();
+ long h = hits.count();
+ try
+ {
+ return ((double)(h - lastHits.get())) / (r - lastRequests.get());
+ }
+ finally
+ {
+ lastRequests.set(r);
+ lastHits.set(h);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
index 02ca0fe..1e9a3b9 100644
--- a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
@@ -20,13 +20,35 @@
*/
package org.apache.cassandra.metrics;
+import java.util.concurrent.TimeUnit;
+
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
-public class ClientRequestMetrics
+public class ClientRequestMetrics extends LatencyMetrics
{
- public static final Counter readTimeouts = Metrics.newCounter(ClientRequestMetrics.class, "ReadTimeouts");
- public static final Counter writeTimeouts = Metrics.newCounter(ClientRequestMetrics.class, "WriteTimeouts");
- public static final Counter readUnavailables = Metrics.newCounter(ClientRequestMetrics.class, "ReadUnavailables");
- public static final Counter writeUnavailables = Metrics.newCounter(ClientRequestMetrics.class, "WriteUnavailables");
+ @Deprecated public static final Counter readTimeouts = Metrics.newCounter(ClientRequestMetrics.class, "ReadTimeouts");
+ @Deprecated public static final Counter writeTimeouts = Metrics.newCounter(ClientRequestMetrics.class, "WriteTimeouts");
+ @Deprecated public static final Counter readUnavailables = Metrics.newCounter(ClientRequestMetrics.class, "ReadUnavailables");
+ @Deprecated public static final Counter writeUnavailables = Metrics.newCounter(ClientRequestMetrics.class, "WriteUnavailables");
+
+ public final Meter timeouts;
+ public final Meter unavailables;
+
+ public ClientRequestMetrics(String scope)
+ {
+ super("org.apache.cassandra.metrics", "ClientRequest", scope);
+
+ timeouts = Metrics.newMeter(factory.createMetricName("Timeouts"), "timeouts", TimeUnit.SECONDS);
+ unavailables = Metrics.newMeter(factory.createMetricName("Unavailables"), "unavailables", TimeUnit.SECONDS);
+ }
+
+ public void release()
+ {
+ super.release();
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("Timeouts"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("Unavailables"));
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
new file mode 100644
index 0000000..e206daf
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+/**
+ * Metrics for {@link ColumnFamilyStore}.
+ */
+public class ColumnFamilyMetrics
+{
+ /** Total amount of data stored in the memtable, including column related overhead. */
+ public final Gauge<Long> memtableDataSize;
+ /** Total number of columns present in the memtable. */
+ public final Gauge<Long> memtableColumnsCount;
+ /** Number of times flush has resulted in the memtable being switched out. */
+ public final Counter memtableSwitchCount;
+ /** Current compression ratio for all SSTables */
+ public final Gauge<Double> compressionRatio;
+ /** Histogram of estimated row size (in bytes). */
+ public final Gauge<long[]> estimatedRowSizeHistogram;
+ /** Histogram of estimated number of columns. */
+ public final Gauge<long[]> estimatedColumnCountHistogram;
+ /** Histogram of the number of sstable data files accessed per read */
+ public final Histogram sstablesPerReadHistogram;
+ /** Read metrics */
+ public final LatencyMetrics readLatency;
+ /** Write metrics */
+ public final LatencyMetrics writeLatency;
+ /** Estimated number of tasks pending for this column family */
+ public final Gauge<Integer> pendingTasks;
+ /** Number of SSTables on disk for this CF */
+ public final Gauge<Integer> liveSSTableCount;
+ /** Disk space used by SSTables belonging to this CF */
+ public final Counter liveDiskSpaceUsed;
+ /** Total disk space used by SSTables belonging to this CF, including obsolete ones waiting to be GC'd */
+ public final Counter totalDiskSpaceUsed;
+ /** Size of the smallest compacted row */
+ public final Gauge<Long> minRowSize;
+ /** Size of the largest compacted row */
+ public final Gauge<Long> maxRowSize;
+ /** Size of the smallest compacted row */
+ public final Gauge<Long> meanRowSize;
+ /** Number of false positives in bloom filter */
+ public final Gauge<Long> bloomFilterFalsePositives;
+ /** Number of false positives in bloom filter from last read */
+ public final Gauge<Long> recentBloomFilterFalsePositives;
+ /** False positive ratio of bloom filter */
+ public final Gauge<Double> bloomFilterFalseRatio;
+ /** False positive ratio of bloom filter from last read */
+ public final Gauge<Double> recentBloomFilterFalseRatio;
+ /** Disk space used by bloom filter */
+ public final Gauge<Long> bloomFilterDiskSpaceUsed;
+
+ private final MetricNameFactory factory;
+
+ // for backward compatibility
+ @Deprecated public final EstimatedHistogram sstablesPerRead = new EstimatedHistogram(35);
+ @Deprecated public final EstimatedHistogram recentSSTablesPerRead = new EstimatedHistogram(35);
+
+ /**
+ * Creates metrics for given {@link ColumnFamilyStore}.
+ *
+ * @param cfs ColumnFamilyStore to measure metrics
+ */
+ public ColumnFamilyMetrics(final ColumnFamilyStore cfs)
+ {
+ factory = new ColumnFamilyMetricNameFactory(cfs);
+
+ memtableColumnsCount = Metrics.newGauge(factory.createMetricName("MemtableColumnsCount"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return cfs.getDataTracker().getMemtable().getOperations();
+ }
+ });
+ memtableDataSize = Metrics.newGauge(factory.createMetricName("MemtableDataSize"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return cfs.getDataTracker().getMemtable().getLiveSize();
+ }
+ });
+ memtableSwitchCount = Metrics.newCounter(factory.createMetricName("MemtableSwitchCount"));
+ estimatedRowSizeHistogram = Metrics.newGauge(factory.createMetricName("EstimatedRowSizeHistogram"), new Gauge<long[]>()
+ {
+ public long[] value()
+ {
+ long[] histogram = new long[90];
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ long[] rowSize = sstable.getEstimatedRowSize().getBuckets(false);
+ for (int i = 0; i < histogram.length; i++)
+ histogram[i] += rowSize[i];
+ }
+ return histogram;
+ }
+ });
+ estimatedColumnCountHistogram = Metrics.newGauge(factory.createMetricName("EstimatedColumnCountHistogram"), new Gauge<long[]>()
+ {
+ public long[] value()
+ {
+ long[] histogram = new long[90];
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ long[] columnSize = sstable.getEstimatedColumnCount().getBuckets(false);
+ for (int i = 0; i < histogram.length; i++)
+ histogram[i] += columnSize[i];
+ }
+ return histogram;
+ }
+ });
+ sstablesPerReadHistogram = Metrics.newHistogram(factory.createMetricName("SSTablesPerReadHistogram"));
+ compressionRatio = Metrics.newGauge(factory.createMetricName("CompressionRatio"), new Gauge<Double>()
+ {
+ public Double value()
+ {
+ double sum = 0;
+ int total = 0;
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ if (sstable.getCompressionRatio() != Double.MIN_VALUE)
+ {
+ sum += sstable.getCompressionRatio();
+ total++;
+ }
+ }
+ return total != 0 ? (double)sum/total: 0;
+ }
+ });
+ readLatency = new LatencyMetrics(factory, "Read");
+ writeLatency = new LatencyMetrics(factory, "Write");
+ pendingTasks = Metrics.newGauge(factory.createMetricName("PendingTasks"), new Gauge<Integer>()
+ {
+ public Integer value()
+ {
+ // TODO this actually isn't a good measure of pending tasks
+ return Table.switchLock.getQueueLength();
+ }
+ });
+ liveSSTableCount = Metrics.newGauge(factory.createMetricName("LiveSSTableCount"), new Gauge<Integer>()
+ {
+ public Integer value()
+ {
+ return cfs.getDataTracker().getSSTables().size();
+ }
+ });
+ liveDiskSpaceUsed = Metrics.newCounter(factory.createMetricName("LiveDiskSpaceUsed"));
+ totalDiskSpaceUsed = Metrics.newCounter(factory.createMetricName("TotalDiskSpaceUsed"));
+ minRowSize = Metrics.newGauge(factory.createMetricName("MinRowSize"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ long min = 0;
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ if (min == 0 || sstable.getEstimatedRowSize().min() < min)
+ min = sstable.getEstimatedRowSize().min();
+ }
+ return min;
+ }
+ });
+ maxRowSize = Metrics.newGauge(factory.createMetricName("MaxRowSize"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ long max = 0;
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ if (sstable.getEstimatedRowSize().max() > max)
+ max = sstable.getEstimatedRowSize().max();
+ }
+ return max;
+ }
+ });
+ meanRowSize = Metrics.newGauge(factory.createMetricName("MeanRowSize"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ long sum = 0;
+ long count = 0;
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ sum += sstable.getEstimatedRowSize().mean();
+ count++;
+ }
+ return count > 0 ? sum / count : 0;
+ }
+ });
+ bloomFilterFalsePositives = Metrics.newGauge(factory.createMetricName("BloomFilterFalsePositives"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ long count = 0L;
+ for (SSTableReader sstable: cfs.getSSTables())
+ count += sstable.getBloomFilterFalsePositiveCount();
+ return count;
+ }
+ });
+ recentBloomFilterFalsePositives = Metrics.newGauge(factory.createMetricName("RecentBloomFilterFalsePositives"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ long count = 0L;
+ for (SSTableReader sstable: cfs.getSSTables())
+ count += sstable.getRecentBloomFilterFalsePositiveCount();
+ return count;
+ }
+ });
+ bloomFilterFalseRatio = Metrics.newGauge(factory.createMetricName("BloomFilterFalseRatio"), new Gauge<Double>()
+ {
+ public Double value()
+ {
+ long falseCount = 0L;
+ long trueCount = 0L;
+ for (SSTableReader sstable: cfs.getSSTables())
+ {
+ falseCount += sstable.getBloomFilterFalsePositiveCount();
+ trueCount += sstable.getBloomFilterTruePositiveCount();
+ }
+ if (falseCount == 0L && trueCount == 0L)
+ return 0d;
+ return (double) falseCount / (trueCount + falseCount);
+ }
+ });
+ recentBloomFilterFalseRatio = Metrics.newGauge(factory.createMetricName("RecentBloomFilterFalseRatio"), new Gauge<Double>()
+ {
+ public Double value()
+ {
+ long falseCount = 0L;
+ long trueCount = 0L;
+ for (SSTableReader sstable: cfs.getSSTables())
+ {
+ falseCount += sstable.getRecentBloomFilterFalsePositiveCount();
+ trueCount += sstable.getRecentBloomFilterTruePositiveCount();
+ }
+ if (falseCount == 0L && trueCount == 0L)
+ return 0d;
+ return (double) falseCount / (trueCount + falseCount);
+ }
+ });
+ bloomFilterDiskSpaceUsed = Metrics.newGauge(factory.createMetricName("BloomFilterDiskSpaceUsed"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ long total = 0;
+ for (SSTableReader sst : cfs.getSSTables())
+ total += sst.getBloomFilterSerializedSize();
+ return total;
+ }
+ });
+ }
+
+ public void updateSSTableIterated(int count)
+ {
+ sstablesPerReadHistogram.update(count);
+ recentSSTablesPerRead.add(count);
+ sstablesPerRead.add(count);
+ }
+
+ /**
+ * Release all associated metrics.
+ */
+ public void release()
+ {
+ readLatency.release();
+ writeLatency.release();
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("MemtableColumnsCount"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("MemtableDataSize"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("MemtableSwitchCount"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("CompressionRatio"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("EstimatedRowSizeHistogram"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("EstimatedColumnCountHistogram"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("SSTablesPerReadHistogram"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("PendingTasks"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("LiveSSTableCount"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("LiveDiskSpaceUsed"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("TotalDiskSpaceUsed"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("MinRowSize"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("MaxRowSize"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("MeanRowSize"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("BloomFilterFalsePositives"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("RecentBloomFilterFalsePositives"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("BloomFilterFalseRatio"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("RecentBloomFilterFalseRatio"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("BloomFilterDiskSpaceUsed"));
+ }
+
+ class ColumnFamilyMetricNameFactory implements MetricNameFactory
+ {
+ private final String keyspaceName;
+ private final String columnFamilyName;
+ private final boolean isIndex;
+
+ ColumnFamilyMetricNameFactory(ColumnFamilyStore cfs)
+ {
+ this.keyspaceName = cfs.table.name;
+ this.columnFamilyName = cfs.getColumnFamilyName();
+ isIndex = cfs.isIndex();
+ }
+
+ public MetricName createMetricName(String metricName)
+ {
+ String groupName = ColumnFamilyMetrics.class.getPackage().getName();
+ String type = isIndex ? "IndexColumnFamily" : "ColumnFamily";
+
+ StringBuilder mbeanName = new StringBuilder();
+ mbeanName.append(groupName).append(":");
+ mbeanName.append("type=").append(type);
+ mbeanName.append(",keyspace=").append(keyspaceName);
+ mbeanName.append(",scope=").append(columnFamilyName);
+ mbeanName.append(",name=").append(metricName);
+
+ return new MetricName(groupName, type, metricName, keyspaceName + "." + columnFamilyName, mbeanName.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
new file mode 100644
index 0000000..598d295
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.db.commitlog.CommitLogAllocator;
+import org.apache.cassandra.db.commitlog.ICommitLogExecutorService;
+
+/**
+ * Metrics for commit log
+ */
+public class CommitLogMetrics
+{
+ public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+ public static final String TYPE_NAME = "CommitLog";
+
+ /** Number of completed tasks */
+ public final Gauge<Long> completedTasks;
+ /** Number of pending tasks */
+ public final Gauge<Long> pendingTasks;
+ /** Current size used by all the commit log segments */
+ public final Gauge<Long> totalCommitLogSize;
+
+ public CommitLogMetrics(final ICommitLogExecutorService executor, final CommitLogAllocator allocator)
+ {
+ completedTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CompletedTasks"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return executor.getCompletedTasks();
+ }
+ });
+ pendingTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "PendingTasks"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return executor.getPendingTasks();
+ }
+ });
+ totalCommitLogSize = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "TotalCommitLogSize"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return allocator.bytesUsed();
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
new file mode 100644
index 0000000..ae098ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.*;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionManager;
+
+/**
+ * Metrics for compaction.
+ */
+public class CompactionMetrics implements CompactionManager.CompactionExecutorStatsCollector
+{
+ public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+ public static final String TYPE_NAME = "Compaction";
+
+ // a synchronized identity set of running tasks to their compaction info
+ private static final Set<CompactionInfo.Holder> compactions = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<CompactionInfo.Holder, Boolean>()));
+
+ /** Estimated number of compactions remaining to perform */
+ public final Gauge<Integer> pendingTasks;
+ /** Number of completed compactions since server [re]start */
+ public final Gauge<Long> completedTasks;
+ /** Total number of compactions since server [re]start */
+ public final Meter totalCompactionsCompleted;
+ /** Total number of bytes compacted since server [re]start */
+ public final Counter bytesCompacted;
+
+ public CompactionMetrics(final ThreadPoolExecutor... collectors)
+ {
+ pendingTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "PendingTasks"), new Gauge<Integer>()
+ {
+ public Integer value()
+ {
+ int n = 0;
+ for (String tableName : Schema.instance.getTables())
+ {
+ for (ColumnFamilyStore cfs : Table.open(tableName).getColumnFamilyStores())
+ n += cfs.getCompactionStrategy().getEstimatedRemainingTasks();
+ }
+ for (ThreadPoolExecutor collector : collectors)
+ n += collector.getTaskCount() - collector.getCompletedTaskCount();
+ return n;
+ }
+ });
+ completedTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CompletedTasks"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ long completedTasks = 0;
+ for (ThreadPoolExecutor collector : collectors)
+ completedTasks += collector.getCompletedTaskCount();
+ return completedTasks;
+ }
+ });
+ totalCompactionsCompleted = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "TotalCompactionsCompleted"), "compaction completed", TimeUnit.SECONDS);
+ bytesCompacted = Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "BytesCompacted"));
+ }
+
+ public void beginCompaction(CompactionInfo.Holder ci)
+ {
+ // notify
+ ci.started();
+ compactions.add(ci);
+ }
+
+ public void finishCompaction(CompactionInfo.Holder ci)
+ {
+ // notify
+ ci.finished();
+ compactions.remove(ci);
+ bytesCompacted.inc(ci.getCompactionInfo().getTotal());
+ totalCompactionsCompleted.mark();
+ }
+
+ public static List<CompactionInfo.Holder> getCompactions()
+ {
+ return new ArrayList<CompactionInfo.Holder>(compactions);
+ }
+}