You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/01/15 16:23:44 UTC

cassandra git commit: Add metric for number of dropped mutations

Repository: cassandra
Updated Branches:
  refs/heads/trunk 6148f5214 -> 66d3428e3


Add metric for number of dropped mutations

patch by Anubhav Kale; reviewed by Paulo Motta for CASSANDRA-10866


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66d3428e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66d3428e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66d3428e

Branch: refs/heads/trunk
Commit: 66d3428e3fe64851fa7587ee69b53e20bb7c09b5
Parents: 6148f52
Author: anubhavkale <an...@microsoft.com>
Authored: Fri Jan 15 10:20:42 2016 -0500
Committer: Carl Yeksigian <ca...@apache.org>
Committed: Fri Jan 15 10:20:42 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/metrics/TableMetrics.java  |  4 +++
 .../apache/cassandra/net/MessagingService.java  | 27 ++++++++++++++++++++
 .../apache/cassandra/service/StorageProxy.java  | 22 ++++++++++++----
 .../org/apache/cassandra/tools/NodeProbe.java   |  1 +
 .../cassandra/tools/nodetool/TableStats.java    |  1 +
 6 files changed, 51 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 61284f2..33b9f9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.4
+ * Add metric for number of dropped mutations (CASSANDRA-10866)
  * Simplify row cache invalidation code (CASSANDRA-10396)
  * Support user-defined compaction through nodetool (CASSANDRA-10660)
  * Stripe view locks by key and table ID to reduce contention (CASSANDRA-10981)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index c8c214e..6492833 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -142,6 +142,9 @@ public class TableMetrics
     /** Time spent waiting for free memtable space, either on- or off-heap */
     public final Histogram waitingOnFreeMemtableSpace;
 
+    /** Dropped Mutations Count */
+    public final Counter droppedMutations;
+
     private final MetricNameFactory factory;
     private final MetricNameFactory aliasFactory;
     private static final MetricNameFactory globalFactory = new AllTableMetricNameFactory("Table");
@@ -621,6 +624,7 @@ public class TableMetrics
         rowCacheHitOutOfRange = createTableCounter("RowCacheHitOutOfRange");
         rowCacheHit = createTableCounter("RowCacheHit");
         rowCacheMiss = createTableCounter("RowCacheMiss");
+        droppedMutations = createTableCounter("DroppedMutations");
 
         casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);
         casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 76f4967..2bfa46c 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -953,6 +953,15 @@ public final class MessagingService implements MessagingServiceMBean
         return versions.containsKey(endpoint);
     }
 
+    public void incrementDroppedMutations(Optional<IMutation> mutationOpt, long timeTaken)
+    {
+        if (mutationOpt.isPresent())
+        {
+            updateDroppedMutationCount(mutationOpt.get());
+        }
+        incrementDroppedMessages(Verb.MUTATION, timeTaken);
+    }
+
     public void incrementDroppedMessages(Verb verb)
     {
         incrementDroppedMessages(verb, false);
@@ -965,6 +974,10 @@ public final class MessagingService implements MessagingServiceMBean
 
     public void incrementDroppedMessages(MessageIn message, long timeTaken)
     {
+        if (message.payload instanceof IMutation)
+        {
+            updateDroppedMutationCount((IMutation) message.payload);
+        }
         incrementDroppedMessages(message.verb, timeTaken, message.constructionTime.isCrossNode);
     }
 
@@ -980,6 +993,20 @@ public final class MessagingService implements MessagingServiceMBean
         incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout);
     }
 
+    private void updateDroppedMutationCount(IMutation mutation)
+    {
+        assert mutation != null : "Mutation should not be null when updating dropped mutations count";
+
+        for (UUID columnFamilyId : mutation.getColumnFamilyIds())
+        {
+            ColumnFamilyStore cfs = Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(columnFamilyId);
+            if (cfs != null)
+            {
+                cfs.metric.droppedMutations.inc();
+            }
+        }
+    }
+
     private void incrementDroppedMessages(DroppedMessages droppedMessages, long timeTaken, boolean isCrossNodeTimeout)
     {
         if (isCrossNodeTimeout)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 27da486..77e65ec 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -944,7 +944,7 @@ public class StorageProxy implements StorageProxyMBean
             logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
 
             if (canDoLocalRequest(target))
-                performLocally(Stage.MUTATION, () -> BatchlogManager.store(batch), handler);
+                performLocally(Stage.MUTATION, Optional.empty(), () -> BatchlogManager.store(batch), handler);
             else
                 MessagingService.instance().sendRR(message, target, handler);
         }
@@ -1233,7 +1233,7 @@ public class StorageProxy implements StorageProxyMBean
             submitHint(mutation, endpointsToHint, responseHandler);
 
         if (insertLocal)
-            performLocally(stage, mutation::apply, responseHandler);
+            performLocally(stage, Optional.of(mutation), mutation::apply, responseHandler);
 
         if (dcGroups != null)
         {
@@ -1322,9 +1322,9 @@ public class StorageProxy implements StorageProxyMBean
         });
     }
 
-    private static void performLocally(Stage stage, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
+    private static void performLocally(Stage stage, Optional<IMutation> mutation, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
     {
-        StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable()
+        StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(mutation)
         {
             public void runMayThrow()
             {
@@ -2473,6 +2473,18 @@ public class StorageProxy implements StorageProxyMBean
     {
         private final long constructionTime = System.currentTimeMillis();
 
+        private final Optional<IMutation> mutationOpt;
+
+        public LocalMutationRunnable(Optional<IMutation> mutationOpt)
+        {
+            this.mutationOpt = mutationOpt;
+        }
+
+        public LocalMutationRunnable()
+        {
+            this.mutationOpt = Optional.empty();
+        }
+
         public final void run()
         {
             final MessagingService.Verb verb = verb();
@@ -2481,7 +2493,7 @@ public class StorageProxy implements StorageProxyMBean
             if (timeTaken > mutationTimeout)
             {
                 if (MessagingService.DROPPABLE_VERBS.contains(verb))
-                    MessagingService.instance().incrementDroppedMessages(verb, timeTaken);
+                    MessagingService.instance().incrementDroppedMutations(mutationOpt, timeTaken);
                 HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
                 {
                     protected void runMayThrow() throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index a8d23ca..2bc516a 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1227,6 +1227,7 @@ public class NodeProbe implements AutoCloseable
                 case "WriteTotalLatency":
                 case "ReadTotalLatency":
                 case "PendingFlushes":
+                case "DroppedMutations":
                     return JMX.newMBeanProxy(mbeanServerConn, oName, CassandraMetricsRegistry.JmxCounterMBean.class).getCount();
                 case "CoordinatorReadLatency":
                 case "CoordinatorScanLatency":

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66d3428e/src/java/org/apache/cassandra/tools/nodetool/TableStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/TableStats.java b/src/java/org/apache/cassandra/tools/nodetool/TableStats.java
index fe664ff..681af5b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/TableStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/TableStats.java
@@ -219,6 +219,7 @@ public class TableStats extends NodeToolCmd
                 histogram = (CassandraMetricsRegistry.JmxHistogramMBean) probe.getColumnFamilyMetric(keyspaceName, tableName, "TombstoneScannedHistogram");
                 System.out.println("\t\tAverage tombstones per slice (last five minutes): " + histogram.getMean());
                 System.out.println("\t\tMaximum tombstones per slice (last five minutes): " + histogram.getMax());
+                System.out.println("\t\tDropped Mutations: " + format((Long) probe.getColumnFamilyMetric(keyspaceName, tableName, "DroppedMutations"), humanReadable));
 
                 System.out.println("");
             }