You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/05 16:16:02 UTC

svn commit: r982630 - in /cassandra/branches/cassandra-0.6: CHANGES.txt src/java/org/apache/cassandra/net/MessageDeserializationTask.java src/java/org/apache/cassandra/net/MessagingService.java

Author: jbellis
Date: Thu Aug  5 14:16:02 2010
New Revision: 982630

URL: http://svn.apache.org/viewvc?rev=982630&view=rev
Log:
backport CASSANDRA-1284/r964922 from trunk

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=982630&r1=982629&r2=982630&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Aug  5 14:16:02 2010
@@ -5,6 +5,7 @@
  * page within a single row during hinted handoff (CASSANDRA-1327)
  * fix compilation on non-sun JKDs (CASSANDRA-1061)
  * remove String.trim() call on row keys in batch mutations (CASSANDRA-1235)
+ * Log summary of dropped messages instead of spamming log (CASSANDRA-1284)
 
 
 0.6.4

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=982630&r1=982629&r2=982630&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Thu Aug  5 14:16:02 2010
@@ -44,8 +44,7 @@ class MessageDeserializationTask extends
     {
         if (System.currentTimeMillis() >  constructionTime + DatabaseDescriptor.getRpcTimeout())
         {
-            logger.warn(String.format("dropping message (%,dms past timeout)",
-                                      System.currentTimeMillis() - (constructionTime + DatabaseDescriptor.getRpcTimeout())));
+            MessagingService.incrementDroppedMessages();
             return;
         }
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=982630&r1=982629&r2=982630&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Thu Aug  5 14:16:02 2010
@@ -45,10 +45,13 @@ import java.nio.channels.ServerSocketCha
 import java.security.MessageDigest;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class MessagingService
 {
@@ -75,11 +78,13 @@ public class MessagingService
     private static NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
     
     private static Logger logger_ = Logger.getLogger(MessagingService.class);
+    private static int LOG_DROPPED_INTERVAL_IN_MS = 1000;
     
     public static final MessagingService instance = new MessagingService();
 
     private SocketThread socketThread;
     private SimpleCondition listenGate;
+    private static AtomicInteger droppedMessages = new AtomicInteger();
 
     public Object clone() throws CloneNotSupportedException
     {
@@ -109,6 +114,15 @@ public class MessagingService
                                                                         new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
 
         streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
+        TimerTask logDropped = new TimerTask()
+        {
+            public void run()
+            {
+                logDroppedMessages();
+            }
+        };
+        Timer timer = new Timer("DroppedMessagesLogger");
+        timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS);
     }
 
     public byte[] hash(String type, byte data[])
@@ -477,7 +491,19 @@ public class MessagingService
         buffer.flip();
         return buffer;
     }
-    
+
+    public static int incrementDroppedMessages()
+    {
+        return droppedMessages.incrementAndGet();
+    }
+               
+    private static void logDroppedMessages()
+    {
+        if (droppedMessages.get() > 0)
+            logger_.warn("Dropped " + droppedMessages + " messages in the last " + LOG_DROPPED_INTERVAL_IN_MS + "ms");
+        droppedMessages.set(0);
+    }
+
     private class SocketThread extends Thread
     {
         private final ServerSocket server;