You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2010/07/16 21:38:33 UTC
svn commit: r964922 - in /cassandra/trunk/src/java/org/apache/cassandra/net:
MessageDeserializationTask.java MessagingService.java
Author: brandonwilliams
Date: Fri Jul 16 19:38:33 2010
New Revision: 964922
URL: http://svn.apache.org/viewvc?rev=964922&view=rev
Log:
Log dropped message summary periodically. Patch by brandonwilliams; reviewed by jbellis for CASSANDRA-1284
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=964922&r1=964921&r2=964922&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Fri Jul 16 19:38:33 2010
@@ -45,8 +45,7 @@ class MessageDeserializationTask extends
{
if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
{
- logger.warn(String.format("dropping message (%,dms past timeout)",
- System.currentTimeMillis() - (constructionTime + DatabaseDescriptor.getRpcTimeout())));
+ MessagingService.incrementDroppedMessages();
return;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=964922&r1=964921&r2=964922&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jul 16 19:38:33 2010
@@ -48,9 +48,12 @@ import java.nio.channels.ServerSocketCha
import java.security.MessageDigest;
import java.util.EnumMap;
import java.util.Map;
+import java.util.TimerTask;
+import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class MessagingService implements IFailureDetectionEventListener
{
@@ -77,11 +80,13 @@ public class MessagingService implements
private static NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
private static Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
-
+ private static int LOG_DROPPED_INTERVAL_IN_MS = 1000;
+
public static final MessagingService instance = new MessagingService();
private SocketThread socketThread;
private SimpleCondition listenGate;
+ private static AtomicInteger droppedMessages = new AtomicInteger();
public Object clone() throws CloneNotSupportedException
{
@@ -111,6 +116,15 @@ public class MessagingService implements
new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
+ TimerTask logDropped = new TimerTask()
+ {
+ public void run()
+ {
+ logDroppedMessages();
+ }
+ };
+ Timer timer = new Timer("DroppedMessagesLogger");
+ timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS);
}
public byte[] hash(String type, byte data[])
@@ -486,7 +500,19 @@ public class MessagingService implements
buffer.flip();
return buffer;
}
-
+
+ public static int incrementDroppedMessages()
+ {
+ return droppedMessages.incrementAndGet();
+ }
+
+ private static void logDroppedMessages()
+ {
+ if (droppedMessages.get() > 0)
+ logger_.warn("Dropped " + droppedMessages + " messages in the last " + LOG_DROPPED_INTERVAL_IN_MS + "ms");
+ droppedMessages.set(0);
+ }
+
private class SocketThread extends Thread
{
private final ServerSocket server;