You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2015/07/24 22:51:43 UTC

[02/15] cassandra git commit: Log when messages are dropped due to cross_node_timeout

Log when messages are dropped due to cross_node_timeout

Patch by Stefania, reviewed by brandonwilliams for CASSANDRA-9793


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/573a1d11
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/573a1d11
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/573a1d11

Branch: refs/heads/trunk
Commit: 573a1d115b86abbe3fb53ff930464d7d8fd95600
Parents: 704ca66
Author: Brandon Williams <br...@apache.org>
Authored: Fri Jul 24 15:41:31 2015 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Jul 24 15:41:31 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/net/IncomingTcpConnection.java    |   9 +-
 .../cassandra/net/MessageDeliveryTask.java      |  10 +-
 .../apache/cassandra/net/MessagingService.java  | 153 +++++++++++++------
 .../cassandra/net/MessagingServiceTest.java     |  39 +++++
 5 files changed, 157 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/573a1d11/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f1e855e..12af151 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.17
+ * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)
  * checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765)
  * Complete CASSANDRA-8448 fix (CASSANDRA-9519)
  * Don't include auth credentials in debug log (CASSANDRA-9682)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/573a1d11/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 4817c75..16ca121 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -158,10 +158,15 @@ public class IncomingTcpConnection extends Thread implements Closeable
             id = input.readInt();
 
         long timestamp = System.currentTimeMillis();
+        boolean isCrossNodeTimestamp = false;
         // make sure to readInt, even if cross_node_to is not enabled
         int partial = input.readInt();
         if (DatabaseDescriptor.hasCrossNodeTimeout())
-            timestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
+        {
+            long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
+            isCrossNodeTimestamp = (timestamp != crossNodeTimestamp);
+            timestamp = crossNodeTimestamp;
+        }
 
         MessageIn message = MessageIn.read(input, version, id);
         if (message == null)
@@ -171,7 +176,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
         }
         if (version <= MessagingService.current_version)
         {
-            MessagingService.instance().receive(message, id, timestamp);
+            MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/573a1d11/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 982f17e..06caf94 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -29,15 +29,17 @@ public class MessageDeliveryTask implements Runnable
     private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryTask.class);
 
     private final MessageIn message;
-    private final long constructionTime;
     private final int id;
+    private final long constructionTime;
+    private final boolean isCrossNodeTimestamp;
 
-    public MessageDeliveryTask(MessageIn message, int id, long timestamp)
+    public MessageDeliveryTask(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp)
     {
         assert message != null;
         this.message = message;
         this.id = id;
-        constructionTime = timestamp;
+        this.constructionTime = timestamp;
+        this.isCrossNodeTimestamp = isCrossNodeTimestamp;
     }
 
     public void run()
@@ -46,7 +48,7 @@ public class MessageDeliveryTask implements Runnable
         if (MessagingService.DROPPABLE_VERBS.contains(verb)
             && System.currentTimeMillis() > constructionTime + message.getTimeout())
         {
-            MessagingService.instance().incrementDroppedMessages(verb);
+            MessagingService.instance().incrementDroppedMessages(verb, isCrossNodeTimestamp);
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/573a1d11/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ee6b87b..b02680d 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -118,7 +119,7 @@ public final class MessagingService implements MessagingServiceMBean
         SNAPSHOT, // Similar to nt snapshot
         MIGRATION_REQUEST,
         GOSSIP_SHUTDOWN,
-        _TRACE, // dummy verb so we can use MS.droppedMessages
+        _TRACE, // dummy verb so we can use MS.droppedMessagesMap
         ECHO,
         REPAIR_MESSAGE,
         // use as padding for backwards compatability where a previous version needs to validate a verb from the future.
@@ -291,10 +292,23 @@ public final class MessagingService implements MessagingServiceMBean
                                                                    Verb.PAGED_RANGE,
                                                                    Verb.REQUEST_RESPONSE);
 
+
+    private static final class DroppedMessages
+    {
+        final DroppedMessageMetrics metrics;
+        final AtomicInteger droppedInternalTimeout;
+        final AtomicInteger droppedCrossNodeTimeout;
+
+        DroppedMessages(Verb verb)
+        {
+            this.metrics = new DroppedMessageMetrics(verb);
+            this.droppedInternalTimeout = new AtomicInteger(0);
+            this.droppedCrossNodeTimeout = new AtomicInteger(0);
+        }
+
+    }
     // total dropped message counts for server lifetime
-    private final Map<Verb, DroppedMessageMetrics> droppedMessages = new EnumMap<Verb, DroppedMessageMetrics>(Verb.class);
-    // dropped count when last requested for the Recent api.  high concurrency isn't necessary here.
-    private final Map<Verb, Integer> lastDroppedInternal = new EnumMap<Verb, Integer>(Verb.class);
+    private final Map<Verb, DroppedMessages> droppedMessagesMap = new EnumMap<>(Verb.class);
 
     private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
 
@@ -303,31 +317,43 @@ public final class MessagingService implements MessagingServiceMBean
 
     private static class MSHandle
     {
-        public static final MessagingService instance = new MessagingService();
+        public static final MessagingService instance = new MessagingService(false);
     }
+
     public static MessagingService instance()
     {
         return MSHandle.instance;
     }
 
-    private MessagingService()
+    private static class MSTestHandle
+    {
+        public static final MessagingService instance = new MessagingService(true);
+    }
+
+    static MessagingService test()
+    {
+        return MSTestHandle.instance;
+    }
+
+    private MessagingService(boolean testOnly)
     {
         for (Verb verb : DROPPABLE_VERBS)
-        {
-            droppedMessages.put(verb, new DroppedMessageMetrics(verb));
-            lastDroppedInternal.put(verb, 0);
-        }
+            droppedMessagesMap.put(verb, new DroppedMessages(verb));
 
         listenGate = new SimpleCondition();
-        verbHandlers = new EnumMap<Verb, IVerbHandler>(Verb.class);
-        Runnable logDropped = new Runnable()
+        verbHandlers = new EnumMap<>(Verb.class);
+
+        if (!testOnly)
         {
-            public void run()
+            Runnable logDropped = new Runnable()
             {
-                logDroppedMessages();
-            }
-        };
-        StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+                public void run()
+                {
+                    logDroppedMessages();
+                }
+            };
+            StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+        }
 
         Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, ?> timeoutReporter = new Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, Object>()
         {
@@ -357,16 +383,19 @@ public final class MessagingService implements MessagingServiceMBean
             }
         };
 
-        callbacks = new ExpiringMap<Integer, CallbackInfo>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter);
+        callbacks = new ExpiringMap<>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter);
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
+        if (!testOnly)
         {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            try
+            {
+                mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
         }
     }
 
@@ -718,7 +747,7 @@ public final class MessagingService implements MessagingServiceMBean
         }
     }
 
-    public void receive(MessageIn message, int id, long timestamp)
+    public void receive(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp)
     {
         TraceState state = Tracing.instance.initializeFromMessage(message);
         if (state != null)
@@ -732,7 +761,7 @@ public final class MessagingService implements MessagingServiceMBean
             return;
         }
 
-        Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
+        Runnable runnable = new MessageDeliveryTask(message, id, timestamp, isCrossNodeTimestamp);
         TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
         assert stage != null : "No stage for message type " + message.verb;
 
@@ -844,8 +873,13 @@ public final class MessagingService implements MessagingServiceMBean
 
     public void incrementDroppedMessages(Verb verb)
     {
+        incrementDroppedMessages(verb, false);
+    }
+
+    public void incrementDroppedMessages(Verb verb, boolean isCrossNodeTimeout)
+    {
         assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
-        droppedMessages.get(verb).dropped.mark();
+        incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout);
     }
 
     /**
@@ -853,34 +887,55 @@ public final class MessagingService implements MessagingServiceMBean
      */
     private void incrementRejectedMessages(Verb verb)
     {
-        DroppedMessageMetrics metrics = droppedMessages.get(verb);
-        if (metrics == null)
+        DroppedMessages droppedMessages = droppedMessagesMap.get(verb);
+        if (droppedMessages == null)
         {
-            metrics = new DroppedMessageMetrics(verb);
-            droppedMessages.put(verb, metrics);
+            droppedMessages = new DroppedMessages(verb);
+            droppedMessagesMap.put(verb, droppedMessages);
         }
-        metrics.dropped.mark();
+        incrementDroppedMessages(droppedMessagesMap.get(verb), false);
+    }
+
+    private void incrementDroppedMessages(DroppedMessages droppedMessages, boolean isCrossNodeTimeout)
+    {
+        droppedMessages.metrics.dropped.mark();
+        if (isCrossNodeTimeout)
+            droppedMessages.droppedCrossNodeTimeout.incrementAndGet();
+        else
+            droppedMessages.droppedInternalTimeout.incrementAndGet();
     }
 
     private void logDroppedMessages()
     {
-        boolean logTpstats = false;
-        for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
+        List<String> logs = getDroppedMessagesLogs();
+        for (String log : logs)
+            logger.error(log);
+
+        if (logs.size() > 0)
+            StatusLogger.log();
+    }
+
+    @VisibleForTesting
+    List<String> getDroppedMessagesLogs()
+    {
+        List<String> ret = new ArrayList<>();
+        for (Map.Entry<Verb, DroppedMessages> entry : droppedMessagesMap.entrySet())
         {
-            int dropped = (int) entry.getValue().dropped.count();
             Verb verb = entry.getKey();
-            int recent = dropped - lastDroppedInternal.get(verb);
-            if (recent > 0)
+            DroppedMessages droppedMessages = entry.getValue();
+
+            int droppedInternalTimeout = droppedMessages.droppedInternalTimeout.getAndSet(0);
+            int droppedCrossNodeTimeout = droppedMessages.droppedCrossNodeTimeout.getAndSet(0);
+            if (droppedInternalTimeout > 0 || droppedCrossNodeTimeout > 0)
             {
-                logTpstats = true;
-                logger.info("{} {} messages dropped in last {}ms",
-                             new Object[] {recent, verb, LOG_DROPPED_INTERVAL_IN_MS});
-                lastDroppedInternal.put(verb, dropped);
+                ret.add(String.format("%s messages were dropped in last %d ms: %d for internal timeout and %d for cross node timeout",
+                                      verb,
+                                      LOG_DROPPED_INTERVAL_IN_MS,
+                                      droppedInternalTimeout,
+                                      droppedCrossNodeTimeout));
             }
         }
-
-        if (logTpstats)
-            StatusLogger.log();
+        return ret;
     }
 
     private static class SocketThread extends Thread
@@ -1011,16 +1066,16 @@ public final class MessagingService implements MessagingServiceMBean
     public Map<String, Integer> getDroppedMessages()
     {
         Map<String, Integer> map = new HashMap<String, Integer>();
-        for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
-            map.put(entry.getKey().toString(), (int) entry.getValue().dropped.count());
+        for (Map.Entry<Verb, DroppedMessages> entry : droppedMessagesMap.entrySet())
+            map.put(entry.getKey().toString(), (int) entry.getValue().metrics.dropped.count());
         return map;
     }
 
     public Map<String, Integer> getRecentlyDroppedMessages()
     {
         Map<String, Integer> map = new HashMap<String, Integer>();
-        for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
-            map.put(entry.getKey().toString(), entry.getValue().getRecentlyDropped());
+        for (Map.Entry<Verb, DroppedMessages> entry : droppedMessagesMap.entrySet())
+            map.put(entry.getKey().toString(), entry.getValue().metrics.getRecentlyDropped());
         return map;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/573a1d11/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
new file mode 100644
index 0000000..04dacf3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -0,0 +1,39 @@
+package org.apache.cassandra.net;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MessagingServiceTest
+{
+    private final MessagingService messagingService = MessagingService.test();
+
+    @Test
+    public void testDroppedMessages()
+    {
+        MessagingService.Verb verb = MessagingService.Verb.READ;
+
+        for (int i = 0; i < 5000; i++)
+            messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+
+        List<String> logs = messagingService.getDroppedMessagesLogs();
+        assertEquals(1, logs.size());
+        assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout and 2500 for cross node timeout", logs.get(0));
+        assertEquals(5000, (int)messagingService.getDroppedMessages().get(verb.toString()));
+        assertEquals(5000, (int) messagingService.getRecentlyDroppedMessages().get(verb.toString()));
+
+        logs = messagingService.getDroppedMessagesLogs();
+        assertEquals(0, logs.size());
+
+        for (int i = 0; i < 2500; i++)
+            messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+
+        logs = messagingService.getDroppedMessagesLogs();
+        assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout and 1250 for cross node timeout", logs.get(0));
+        assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString()));
+        assertEquals(2500, (int) messagingService.getRecentlyDroppedMessages().get(verb.toString()));
+    }
+
+}