You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/12/24 14:03:58 UTC

[1/2] cassandra git commit: Revert "Add latency logging for dropped messages"

Repository: cassandra
Updated Branches:
  refs/heads/trunk bb25f5bdd -> c9ef25fd8


Revert "Add latency logging for dropped messages"

This reverts commit 3c8d87f4324e5ff8bf6b1c3652e9c5eacf03bc20.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd5c8bbc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd5c8bbc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd5c8bbc

Branch: refs/heads/trunk
Commit: bd5c8bbc04e017089743b27cce55635dac00b98e
Parents: bb25f5b
Author: Joshua McKenzie <jm...@apache.org>
Authored: Thu Dec 24 07:59:31 2015 -0500
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 24 07:59:31 2015 -0500

----------------------------------------------------------------------
 .../cassandra/net/MessageDeliveryTask.java      | 42 ++-----------------
 .../apache/cassandra/service/StorageProxy.java  | 44 ++------------------
 2 files changed, 7 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd5c8bbc/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index bede3d8..818cfc6 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -18,13 +18,11 @@
 package org.apache.cassandra.net;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.EnumSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.IMutation;
+
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.index.IndexNotAvailableException;
@@ -45,11 +43,10 @@ public class MessageDeliveryTask implements Runnable
 
     public void run()
     {
-        long timeTaken = System.currentTimeMillis() - message.constructionTime.timestamp;
         MessagingService.Verb verb = message.verb;
-        if (MessagingService.DROPPABLE_VERBS.contains(verb)&& message.getTimeout() > timeTaken)
+        if (MessagingService.DROPPABLE_VERBS.contains(verb)
+            && System.currentTimeMillis() > message.constructionTime.timestamp + message.getTimeout())
         {
-            LogDroppedMessageDetails(timeTaken);
             MessagingService.instance().incrementDroppedMessages(message);
             return;
         }
@@ -85,37 +82,6 @@ public class MessageDeliveryTask implements Runnable
             Gossiper.instance.setLastProcessedMessageAt(message.constructionTime.timestamp);
     }
 
-    private void LogDroppedMessageDetails(long timeTaken)
-    {
-        logger.debug("MessageDeliveryTask ran after {} ms, allowed time was {} ms. Dropping message {}",
-                timeTaken, message.getTimeout(), message.toString());
-        // Print KS and CF if Payload is mutation or a list of mutations (sent due to schema announcements)
-        IMutation mutation;
-        if (message.payload instanceof IMutation)
-        {
-            mutation = (IMutation)message.payload;
-            if (mutation != null)
-            {
-                logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", mutation.getKeyspaceName(), Arrays.toString(mutation.getColumnFamilyIds().toArray()));
-            }
-        }
-        else if (message.payload instanceof Collection<?>)
-        {
-            Collection<?> payloadItems = (Collection<?>)message.payload;
-            for (Object payloadItem : payloadItems)
-            {
-                if (payloadItem instanceof IMutation)
-                {
-                    mutation = (IMutation)payloadItem;
-                    if (mutation != null)
-                    {
-                        logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", mutation.getKeyspaceName(), Arrays.toString(mutation.getColumnFamilyIds().toArray()));
-                    }
-                }
-            }
-        }
-    }
-
     private void handleFailure(Throwable t)
     {
         if (message.doCallbackOnFailure())
@@ -129,4 +95,4 @@ public class MessageDeliveryTask implements Runnable
     private static final EnumSet<MessagingService.Verb> GOSSIP_VERBS = EnumSet.of(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                                                   MessagingService.Verb.GOSSIP_DIGEST_ACK2,
                                                                                   MessagingService.Verb.GOSSIP_DIGEST_SYN);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd5c8bbc/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1c30cd7..f161607 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1198,7 +1198,7 @@ public class StorageProxy implements StorageProxyMBean
             submitHint(mutation, endpointsToHint, responseHandler);
 
         if (insertLocal)
-            performLocally(stage, mutation, mutation::apply, responseHandler);
+            performLocally(stage, mutation::apply, responseHandler);
 
         if (dcGroups != null)
         {
@@ -1286,27 +1286,6 @@ public class StorageProxy implements StorageProxyMBean
         });
     }
 
-    private static void performLocally(Stage stage, IMutation mutation, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
-    {
-        StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(mutation)
-        {
-            public void runMayThrow()
-            {
-                try
-                {
-                    runnable.run();
-                    handler.response(null);
-                }
-                catch (Exception ex)
-                {
-                    if (!(ex instanceof WriteTimeoutException))
-                        logger.error("Failed to apply mutation locally : {}", ex);
-                    handler.onFailure(FBUtilities.getBroadcastAddress());
-                }
-            }
-        });
-    }
-
     /**
      * Handle counter mutation on the coordinator host.
      *
@@ -2429,28 +2408,11 @@ public class StorageProxy implements StorageProxyMBean
     private static abstract class LocalMutationRunnable implements Runnable
     {
         private final long constructionTime = System.currentTimeMillis();
-        private IMutation mutation;
-
-        public LocalMutationRunnable(IMutation mutation)
-        {
-            this.mutation = mutation;
-        }
-
-        public LocalMutationRunnable()
-        {
-        }
 
         public final void run()
         {
-            long mutationTimeout = DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION);
-            if (System.currentTimeMillis() > constructionTime + mutationTimeout)
+            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
             {
-                long timeTaken = System.currentTimeMillis() - constructionTime;
-                logger.debug("LocalMutationRunnable thread ran after {} ms, allowed time was {} ms. ", timeTaken, mutationTimeout);
-                if (this.mutation != null)
-                {
-                    logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", this.mutation.getKeyspaceName(), Arrays.toString(this.mutation.getColumnFamilyIds().toArray()));
-                }
                 MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
                 HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
                 {
@@ -2634,4 +2596,4 @@ public class StorageProxy implements StorageProxyMBean
     public long getReadRepairRepairedBackground() {
         return ReadRepairMetrics.repairedBackground.getCount();
     }
-}
\ No newline at end of file
+}


[2/2] cassandra git commit: Add latency logging for dropped messages

Posted by jm...@apache.org.
Add latency logging for dropped messages

Patch by akale; reviewed by pmotta for CASSANDRA-10580


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9ef25fd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9ef25fd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9ef25fd

Branch: refs/heads/trunk
Commit: c9ef25fd81501005b6484baf064081efc557f3f4
Parents: bd5c8bb
Author: anubhavkale <an...@microsoft.com>
Authored: Tue Dec 15 21:39:16 2015 -0800
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 24 08:01:53 2015 -0500

----------------------------------------------------------------------
 .../cassandra/db/ReadCommandVerbHandler.java    |  2 +-
 .../metrics/DroppedMessageMetrics.java          | 10 ++++++
 .../cassandra/net/MessageDeliveryTask.java      |  5 +--
 .../apache/cassandra/net/MessagingService.java  | 37 ++++++++++++++++----
 .../apache/cassandra/service/StorageProxy.java  | 13 ++++---
 .../cassandra/net/MessagingServiceTest.java     | 10 +++---
 6 files changed, 57 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 9eaa8fa..b2fb876 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -53,7 +53,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         if (!command.complete())
         {
             Tracing.trace("Discarding partial response to {} (timed out)", message.from);
-            MessagingService.instance().incrementDroppedMessages(message);
+            MessagingService.instance().incrementDroppedMessages(message, System.currentTimeMillis() - message.constructionTime.timestamp);
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
index 58c80fb..2a94c9f 100644
--- a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
@@ -18,6 +18,8 @@
 package org.apache.cassandra.metrics;
 
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+
 import org.apache.cassandra.net.MessagingService;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -30,9 +32,17 @@ public class DroppedMessageMetrics
     /** Number of dropped messages */
     public final Meter dropped;
 
+    /** The dropped latency within node */
+    public final Timer internalDroppedLatency;
+
+    /** The cross node dropped latency */
+    public final Timer crossNodeDroppedLatency;
+
     public DroppedMessageMetrics(MessagingService.Verb verb)
     {
         MetricNameFactory factory = new DefaultNameFactory("DroppedMessage", verb.toString());
         dropped = Metrics.meter(factory.createMetricName("Dropped"));
+        internalDroppedLatency = Metrics.timer(factory.createMetricName("InternalDroppedLatency"));
+        crossNodeDroppedLatency = Metrics.timer(factory.createMetricName("CrossNodeDroppedLatency"));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 818cfc6..d9f8b7c 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -44,10 +44,11 @@ public class MessageDeliveryTask implements Runnable
     public void run()
     {
         MessagingService.Verb verb = message.verb;
+        long timeTaken = System.currentTimeMillis() - message.constructionTime.timestamp;
         if (MessagingService.DROPPABLE_VERBS.contains(verb)
-            && System.currentTimeMillis() > message.constructionTime.timestamp + message.getTimeout())
+            && timeTaken > message.getTimeout())
         {
-            MessagingService.instance().incrementDroppedMessages(message);
+            MessagingService.instance().incrementDroppedMessages(message, timeTaken);
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index fab082a..d95c49b 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -957,9 +957,20 @@ public final class MessagingService implements MessagingServiceMBean
         incrementDroppedMessages(verb, false);
     }
 
-    public void incrementDroppedMessages(MessageIn message)
+    public void incrementDroppedMessages(Verb verb, long timeTaken)
     {
-        incrementDroppedMessages(message.verb, message.constructionTime.isCrossNode);
+        incrementDroppedMessages(verb, timeTaken, false);
+    }
+
+    public void incrementDroppedMessages(MessageIn message, long timeTaken)
+    {
+        incrementDroppedMessages(message.verb, timeTaken, message.constructionTime.isCrossNode);
+    }
+
+    public void incrementDroppedMessages(Verb verb, long timeTaken, boolean isCrossNodeTimeout)
+    {
+        assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
+        incrementDroppedMessages(droppedMessagesMap.get(verb), timeTaken, isCrossNodeTimeout);
     }
 
     public void incrementDroppedMessages(Verb verb, boolean isCrossNodeTimeout)
@@ -968,6 +979,15 @@ public final class MessagingService implements MessagingServiceMBean
         incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout);
     }
 
+    private void incrementDroppedMessages(DroppedMessages droppedMessages, long timeTaken, boolean isCrossNodeTimeout)
+    {
+        if (isCrossNodeTimeout)
+            droppedMessages.metrics.crossNodeDroppedLatency.update(timeTaken, TimeUnit.MILLISECONDS);
+        else
+            droppedMessages.metrics.internalDroppedLatency.update(timeTaken, TimeUnit.MILLISECONDS);
+        incrementDroppedMessages(droppedMessages, isCrossNodeTimeout);
+    }
+
     private void incrementDroppedMessages(DroppedMessages droppedMessages, boolean isCrossNodeTimeout)
     {
         droppedMessages.metrics.dropped.mark();
@@ -1000,11 +1020,14 @@ public final class MessagingService implements MessagingServiceMBean
             int droppedCrossNodeTimeout = droppedMessages.droppedCrossNodeTimeout.getAndSet(0);
             if (droppedInternalTimeout > 0 || droppedCrossNodeTimeout > 0)
             {
-                ret.add(String.format("%s messages were dropped in last %d ms: %d for internal timeout and %d for cross node timeout",
-                                      verb,
-                                      LOG_DROPPED_INTERVAL_IN_MS,
-                                      droppedInternalTimeout,
-                                      droppedCrossNodeTimeout));
+                ret.add(String.format("%s messages were dropped in last %d ms: %d for internal timeout and %d for cross node timeout."
+                                     + " Mean internal dropped latency: %d ms and Mean cross-node dropped latency: %d ms",
+                                     verb,
+                                     LOG_DROPPED_INTERVAL_IN_MS,
+                                     droppedInternalTimeout,
+                                     droppedCrossNodeTimeout,
+                                     TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.internalDroppedLatency.getSnapshot().getMean()),
+                                     TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.crossNodeDroppedLatency.getSnapshot().getMean())));
             }
         }
         return ret;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f161607..2e32f16 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1732,7 +1732,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 else
                 {
-                    MessagingService.instance().incrementDroppedMessages(verb);
+                    MessagingService.instance().incrementDroppedMessages(verb, System.currentTimeMillis() - constructionTime);
                     handler.onFailure(FBUtilities.getBroadcastAddress());
                 }
 
@@ -2383,9 +2383,10 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            if (System.currentTimeMillis() > constructionTime + timeout)
+            long timeTaken = System.currentTimeMillis() - constructionTime;
+            if (timeTaken > timeout)
             {
-                MessagingService.instance().incrementDroppedMessages(verb);
+                MessagingService.instance().incrementDroppedMessages(verb, timeTaken);
                 return;
             }
             try
@@ -2411,9 +2412,11 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
+            long mutationTimeout = DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION);
+            long timeTaken = System.currentTimeMillis() - constructionTime;
+            if (timeTaken > mutationTimeout)
             {
-                MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
+                MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION, timeTaken);
                 HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
                 {
                     protected void runMayThrow() throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 75c146e..3b9c957 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -15,22 +15,22 @@ public class MessagingServiceTest
     {
         MessagingService.Verb verb = MessagingService.Verb.READ;
 
-        for (int i = 0; i < 5000; i++)
-            messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+        for (int i = 1; i <= 5000; i++)
+            messagingService.incrementDroppedMessages(verb, i, i % 2 == 0);
 
         List<String> logs = messagingService.getDroppedMessagesLogs();
         assertEquals(1, logs.size());
-        assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout and 2500 for cross node timeout", logs.get(0));
+        assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout and 2500 for cross node timeout. Mean internal dropped latency: 2730 ms and Mean cross-node dropped latency: 2731 ms", logs.get(0));
         assertEquals(5000, (int)messagingService.getDroppedMessages().get(verb.toString()));
 
         logs = messagingService.getDroppedMessagesLogs();
         assertEquals(0, logs.size());
 
         for (int i = 0; i < 2500; i++)
-            messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+            messagingService.incrementDroppedMessages(verb, i, i % 2 == 0);
 
         logs = messagingService.getDroppedMessagesLogs();
-        assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout and 1250 for cross node timeout", logs.get(0));
+        assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout and 1250 for cross node timeout. Mean internal dropped latency: 2277 ms and Mean cross-node dropped latency: 2278 ms", logs.get(0));
         assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString()));
     }