You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2010/07/16 21:38:33 UTC

svn commit: r964922 - in /cassandra/trunk/src/java/org/apache/cassandra/net: MessageDeserializationTask.java MessagingService.java

Author: brandonwilliams
Date: Fri Jul 16 19:38:33 2010
New Revision: 964922

URL: http://svn.apache.org/viewvc?rev=964922&view=rev
Log:
Log dropped message summary periodically.  Patch by brandonwilliams; reviewed by jbellis for CASSANDRA-1284

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=964922&r1=964921&r2=964922&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Fri Jul 16 19:38:33 2010
@@ -45,8 +45,7 @@ class MessageDeserializationTask extends
     {
         if (System.currentTimeMillis() >  constructionTime + DatabaseDescriptor.getRpcTimeout())
         {
-            logger.warn(String.format("dropping message (%,dms past timeout)",
-                                      System.currentTimeMillis() - (constructionTime + DatabaseDescriptor.getRpcTimeout())));
+            MessagingService.incrementDroppedMessages();
             return;
         }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=964922&r1=964921&r2=964922&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jul 16 19:38:33 2010
@@ -48,9 +48,12 @@ import java.nio.channels.ServerSocketCha
 import java.security.MessageDigest;
 import java.util.EnumMap;
 import java.util.Map;
+import java.util.TimerTask;
+import java.util.Timer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class MessagingService implements IFailureDetectionEventListener
 {
@@ -77,11 +80,13 @@ public class MessagingService implements
     private static NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
     
     private static Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
-    
+    private static int LOG_DROPPED_INTERVAL_IN_MS = 1000;
+
     public static final MessagingService instance = new MessagingService();
 
     private SocketThread socketThread;
     private SimpleCondition listenGate;
+    private static AtomicInteger droppedMessages = new AtomicInteger();
 
     public Object clone() throws CloneNotSupportedException
     {
@@ -111,6 +116,15 @@ public class MessagingService implements
                                                                         new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
 
         streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
+        TimerTask logDropped = new TimerTask()
+        {
+            public void run()
+            {
+                logDroppedMessages();
+            }
+        };
+        Timer timer = new Timer("DroppedMessagesLogger");
+        timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS);
     }
 
     public byte[] hash(String type, byte data[])
@@ -486,7 +500,19 @@ public class MessagingService implements
         buffer.flip();
         return buffer;
     }
-    
+
+    public static int incrementDroppedMessages()
+    {
+        return droppedMessages.incrementAndGet();
+    }
+               
+    private static void logDroppedMessages()
+    {
+        if (droppedMessages.get() > 0)
+            logger_.warn("Dropped " + droppedMessages + " messages in the last " + LOG_DROPPED_INTERVAL_IN_MS + "ms");
+        droppedMessages.set(0);
+    }
+
     private class SocketThread extends Thread
     {
         private final ServerSocket server;