You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/10/28 20:24:35 UTC
svn commit: r1028415 - in /cassandra/branches/cassandra-0.6: CHANGES.txt
src/java/org/apache/cassandra/net/MessageDeliveryTask.java
src/java/org/apache/cassandra/net/MessagingService.java
Author: jbellis
Date: Thu Oct 28 18:24:34 2010
New Revision: 1028415
URL: http://svn.apache.org/viewvc?rev=1028415&view=rev
Log:
log type of dropped messages. patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1677
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1028415&r1=1028414&r2=1028415&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Oct 28 18:24:34 2010
@@ -2,6 +2,7 @@ dev
* quorum read optimization (CASSANDRA-1622)
* update GC settings in cassandra.bat (CASSANDRA-1636)
* fix hinted handoff replay (CASSANDRA-1656)
+ * log type of dropped messages (CASSANDRA-1677)
0.6.6
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1028415&r1=1028414&r2=1028415&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Thu Oct 28 18:24:34 2010
@@ -37,13 +37,14 @@ public class MessageDeliveryTask impleme
public void run()
{
+ StorageService.Verb verb = message_.getVerb();
+
if (System.currentTimeMillis() > constructionTime_ + DatabaseDescriptor.getRpcTimeout())
{
- MessagingService.incrementDroppedMessages();
+ MessagingService.incrementDroppedMessages(verb);
return;
}
- StorageService.Verb verb = message_.getVerb();
IVerbHandler verbHandler = MessagingService.instance.getVerbHandler(verb);
assert verbHandler != null : "unknown verb " + verb;
verbHandler.doVerb(message_);
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=1028415&r1=1028414&r2=1028415&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Thu Oct 28 18:24:34 2010
@@ -28,10 +28,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -81,7 +78,12 @@ public class MessagingService
private SocketThread socketThread;
private SimpleCondition listenGate;
- private static AtomicInteger droppedMessages = new AtomicInteger();
+ private static final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
+ static
+ {
+ for (StorageService.Verb verb : StorageService.Verb.values())
+ droppedMessages.put(verb, new AtomicInteger());
+ }
public Object clone() throws CloneNotSupportedException
{
@@ -490,16 +492,23 @@ public class MessagingService
return buffer;
}
- public static int incrementDroppedMessages()
+ public static int incrementDroppedMessages(StorageService.Verb verb)
{
- return droppedMessages.incrementAndGet();
+ return droppedMessages.get(verb).incrementAndGet();
}
private static void logDroppedMessages()
{
- if (droppedMessages.get() > 0)
- logger_.warn("Dropped " + droppedMessages + " messages in the last " + LOG_DROPPED_INTERVAL_IN_MS + "ms");
- droppedMessages.set(0);
+ for (Map.Entry<StorageService.Verb, AtomicInteger> entry : droppedMessages.entrySet())
+ {
+ AtomicInteger dropped = entry.getValue();
+ if (dropped.get() > 0)
+ {
+ logger_.warn(String.format("Dropped %s %s messages in the last %sms",
+ dropped, entry.getKey(), LOG_DROPPED_INTERVAL_IN_MS));
+ }
+ dropped.set(0);
+ }
}
private class SocketThread extends Thread