You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/01/15 16:23:44 UTC
cassandra git commit: Add metric for number of dropped mutations
Repository: cassandra
Updated Branches:
refs/heads/trunk 6148f5214 -> 66d3428e3
Add metric for number of dropped mutations
patch by Anubhav Kale; reviewed by Paulo Motta for CASSANDRA-10866
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66d3428e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66d3428e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66d3428e
Branch: refs/heads/trunk
Commit: 66d3428e3fe64851fa7587ee69b53e20bb7c09b5
Parents: 6148f52
Author: anubhavkale <an...@microsoft.com>
Authored: Fri Jan 15 10:20:42 2016 -0500
Committer: Carl Yeksigian <ca...@apache.org>
Committed: Fri Jan 15 10:20:42 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/metrics/TableMetrics.java | 4 +++
.../apache/cassandra/net/MessagingService.java | 27 ++++++++++++++++++++
.../apache/cassandra/service/StorageProxy.java | 22 ++++++++++++----
.../org/apache/cassandra/tools/NodeProbe.java | 1 +
.../cassandra/tools/nodetool/TableStats.java | 1 +
6 files changed, 51 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 61284f2..33b9f9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.4
+ * Add metric for number of dropped mutations (CASSANDRA-10866)
* Simplify row cache invalidation code (CASSANDRA-10396)
* Support user-defined compaction through nodetool (CASSANDRA-10660)
* Stripe view locks by key and table ID to reduce contention (CASSANDRA-10981)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index c8c214e..6492833 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -142,6 +142,9 @@ public class TableMetrics
/** Time spent waiting for free memtable space, either on- or off-heap */
public final Histogram waitingOnFreeMemtableSpace;
+ /** Dropped Mutations Count */
+ public final Counter droppedMutations;
+
private final MetricNameFactory factory;
private final MetricNameFactory aliasFactory;
private static final MetricNameFactory globalFactory = new AllTableMetricNameFactory("Table");
@@ -621,6 +624,7 @@ public class TableMetrics
rowCacheHitOutOfRange = createTableCounter("RowCacheHitOutOfRange");
rowCacheHit = createTableCounter("RowCacheHit");
rowCacheMiss = createTableCounter("RowCacheMiss");
+ droppedMutations = createTableCounter("DroppedMutations");
casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 76f4967..2bfa46c 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -953,6 +953,15 @@ public final class MessagingService implements MessagingServiceMBean
return versions.containsKey(endpoint);
}
+ public void incrementDroppedMutations(Optional<IMutation> mutationOpt, long timeTaken)
+ {
+ if (mutationOpt.isPresent())
+ {
+ updateDroppedMutationCount(mutationOpt.get());
+ }
+ incrementDroppedMessages(Verb.MUTATION, timeTaken);
+ }
+
public void incrementDroppedMessages(Verb verb)
{
incrementDroppedMessages(verb, false);
@@ -965,6 +974,10 @@ public final class MessagingService implements MessagingServiceMBean
public void incrementDroppedMessages(MessageIn message, long timeTaken)
{
+ if (message.payload instanceof IMutation)
+ {
+ updateDroppedMutationCount((IMutation) message.payload);
+ }
incrementDroppedMessages(message.verb, timeTaken, message.constructionTime.isCrossNode);
}
@@ -980,6 +993,20 @@ public final class MessagingService implements MessagingServiceMBean
incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout);
}
+ private void updateDroppedMutationCount(IMutation mutation)
+ {
+ assert mutation != null : "Mutation should not be null when updating dropped mutations count";
+
+ for (UUID columnFamilyId : mutation.getColumnFamilyIds())
+ {
+ ColumnFamilyStore cfs = Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(columnFamilyId);
+ if (cfs != null)
+ {
+ cfs.metric.droppedMutations.inc();
+ }
+ }
+ }
+
private void incrementDroppedMessages(DroppedMessages droppedMessages, long timeTaken, boolean isCrossNodeTimeout)
{
if (isCrossNodeTimeout)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 27da486..77e65ec 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -944,7 +944,7 @@ public class StorageProxy implements StorageProxyMBean
logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
if (canDoLocalRequest(target))
- performLocally(Stage.MUTATION, () -> BatchlogManager.store(batch), handler);
+ performLocally(Stage.MUTATION, Optional.empty(), () -> BatchlogManager.store(batch), handler);
else
MessagingService.instance().sendRR(message, target, handler);
}
@@ -1233,7 +1233,7 @@ public class StorageProxy implements StorageProxyMBean
submitHint(mutation, endpointsToHint, responseHandler);
if (insertLocal)
- performLocally(stage, mutation::apply, responseHandler);
+ performLocally(stage, Optional.of(mutation), mutation::apply, responseHandler);
if (dcGroups != null)
{
@@ -1322,9 +1322,9 @@ public class StorageProxy implements StorageProxyMBean
});
}
- private static void performLocally(Stage stage, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
+ private static void performLocally(Stage stage, Optional<IMutation> mutation, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
{
- StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable()
+ StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(mutation)
{
public void runMayThrow()
{
@@ -2473,6 +2473,18 @@ public class StorageProxy implements StorageProxyMBean
{
private final long constructionTime = System.currentTimeMillis();
+ private final Optional<IMutation> mutationOpt;
+
+ public LocalMutationRunnable(Optional<IMutation> mutationOpt)
+ {
+ this.mutationOpt = mutationOpt;
+ }
+
+ public LocalMutationRunnable()
+ {
+ this.mutationOpt = Optional.empty();
+ }
+
public final void run()
{
final MessagingService.Verb verb = verb();
@@ -2481,7 +2493,7 @@ public class StorageProxy implements StorageProxyMBean
if (timeTaken > mutationTimeout)
{
if (MessagingService.DROPPABLE_VERBS.contains(verb))
- MessagingService.instance().incrementDroppedMessages(verb, timeTaken);
+ MessagingService.instance().incrementDroppedMutations(mutationOpt, timeTaken);
HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
{
protected void runMayThrow() throws Exception
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index a8d23ca..2bc516a 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1227,6 +1227,7 @@ public class NodeProbe implements AutoCloseable
case "WriteTotalLatency":
case "ReadTotalLatency":
case "PendingFlushes":
+ case "DroppedMutations":
return JMX.newMBeanProxy(mbeanServerConn, oName, CassandraMetricsRegistry.JmxCounterMBean.class).getCount();
case "CoordinatorReadLatency":
case "CoordinatorScanLatency":
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/tools/nodetool/TableStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/TableStats.java b/src/java/org/apache/cassandra/tools/nodetool/TableStats.java
index fe664ff..681af5b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/TableStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/TableStats.java
@@ -219,6 +219,7 @@ public class TableStats extends NodeToolCmd
histogram = (CassandraMetricsRegistry.JmxHistogramMBean) probe.getColumnFamilyMetric(keyspaceName, tableName, "TombstoneScannedHistogram");
System.out.println("\t\tAverage tombstones per slice (last five minutes): " + histogram.getMean());
System.out.println("\t\tMaximum tombstones per slice (last five minutes): " + histogram.getMax());
+ System.out.println("\t\tDropped Mutations: " + format((Long) probe.getColumnFamilyMetric(keyspaceName, tableName, "DroppedMutations"), humanReadable));
System.out.println("");
}