You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/10/28 20:24:35 UTC

svn commit: r1028415 - in /cassandra/branches/cassandra-0.6: CHANGES.txt src/java/org/apache/cassandra/net/MessageDeliveryTask.java src/java/org/apache/cassandra/net/MessagingService.java

Author: jbellis
Date: Thu Oct 28 18:24:34 2010
New Revision: 1028415

URL: http://svn.apache.org/viewvc?rev=1028415&view=rev
Log:
log type of dropped messages.  patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1677

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1028415&r1=1028414&r2=1028415&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Oct 28 18:24:34 2010
@@ -2,6 +2,7 @@ dev
  * quorum read optimization (CASSANDRA-1622)
  * update GC settings in cassandra.bat (CASSANDRA-1636)
  * fix hinted handoff replay (CASSANDRA-1656)
+ * log type of dropped messages (CASSANDRA-1677)
 
 
 0.6.6

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1028415&r1=1028414&r2=1028415&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Thu Oct 28 18:24:34 2010
@@ -37,13 +37,14 @@ public class MessageDeliveryTask impleme
     
     public void run()
     { 
+        StorageService.Verb verb = message_.getVerb();
+
         if (System.currentTimeMillis() >  constructionTime_ + DatabaseDescriptor.getRpcTimeout())
         {
-            MessagingService.incrementDroppedMessages();
+            MessagingService.incrementDroppedMessages(verb);
             return;
         }
 
-        StorageService.Verb verb = message_.getVerb();
         IVerbHandler verbHandler = MessagingService.instance.getVerbHandler(verb);
         assert verbHandler != null : "unknown verb " + verb;
         verbHandler.doVerb(message_);

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=1028415&r1=1028414&r2=1028415&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Thu Oct 28 18:24:34 2010
@@ -28,10 +28,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -81,7 +78,12 @@ public class MessagingService
 
     private SocketThread socketThread;
     private SimpleCondition listenGate;
-    private static AtomicInteger droppedMessages = new AtomicInteger();
+    private static final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
+    static
+    {
+        for (StorageService.Verb verb : StorageService.Verb.values())
+            droppedMessages.put(verb, new AtomicInteger());
+    }
 
     public Object clone() throws CloneNotSupportedException
     {
@@ -490,16 +492,23 @@ public class MessagingService
         return buffer;
     }
 
-    public static int incrementDroppedMessages()
+    public static int incrementDroppedMessages(StorageService.Verb verb)
     {
-        return droppedMessages.incrementAndGet();
+        return droppedMessages.get(verb).incrementAndGet();
     }
                
     private static void logDroppedMessages()
     {
-        if (droppedMessages.get() > 0)
-            logger_.warn("Dropped " + droppedMessages + " messages in the last " + LOG_DROPPED_INTERVAL_IN_MS + "ms");
-        droppedMessages.set(0);
+        for (Map.Entry<StorageService.Verb, AtomicInteger> entry : droppedMessages.entrySet())
+        {
+            AtomicInteger dropped = entry.getValue();
+            if (dropped.get() > 0)
+            {
+                logger_.warn(String.format("Dropped %s %s messages in the last %sms",
+                                           dropped, entry.getKey(), LOG_DROPPED_INTERVAL_IN_MS));
+            }
+            dropped.set(0);
+        }
     }
 
     private class SocketThread extends Thread