You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/05 16:16:02 UTC
svn commit: r982630 - in /cassandra/branches/cassandra-0.6: CHANGES.txt
src/java/org/apache/cassandra/net/MessageDeserializationTask.java
src/java/org/apache/cassandra/net/MessagingService.java
Author: jbellis
Date: Thu Aug 5 14:16:02 2010
New Revision: 982630
URL: http://svn.apache.org/viewvc?rev=982630&view=rev
Log:
backport CASSANDRA-1284/r964922 from trunk
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=982630&r1=982629&r2=982630&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Aug 5 14:16:02 2010
@@ -5,6 +5,7 @@
* page within a single row during hinted handoff (CASSANDRA-1327)
* fix compilation on non-sun JKDs (CASSANDRA-1061)
* remove String.trim() call on row keys in batch mutations (CASSANDRA-1235)
+ * Log summary of dropped messages instead of spamming log (CASSANDRA-1284)
0.6.4
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java?rev=982630&r1=982629&r2=982630&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessageDeserializationTask.java Thu Aug 5 14:16:02 2010
@@ -44,8 +44,7 @@ class MessageDeserializationTask extends
{
if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
{
- logger.warn(String.format("dropping message (%,dms past timeout)",
- System.currentTimeMillis() - (constructionTime + DatabaseDescriptor.getRpcTimeout())));
+ MessagingService.incrementDroppedMessages();
return;
}
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=982630&r1=982629&r2=982630&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Thu Aug 5 14:16:02 2010
@@ -45,10 +45,13 @@ import java.nio.channels.ServerSocketCha
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class MessagingService
{
@@ -75,11 +78,13 @@ public class MessagingService
private static NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
private static Logger logger_ = Logger.getLogger(MessagingService.class);
+ private static int LOG_DROPPED_INTERVAL_IN_MS = 1000;
public static final MessagingService instance = new MessagingService();
private SocketThread socketThread;
private SimpleCondition listenGate;
+ private static AtomicInteger droppedMessages = new AtomicInteger();
public Object clone() throws CloneNotSupportedException
{
@@ -109,6 +114,15 @@ public class MessagingService
new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
+ TimerTask logDropped = new TimerTask()
+ {
+ public void run()
+ {
+ logDroppedMessages();
+ }
+ };
+ Timer timer = new Timer("DroppedMessagesLogger");
+ timer.schedule(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS);
}
public byte[] hash(String type, byte data[])
@@ -477,7 +491,19 @@ public class MessagingService
buffer.flip();
return buffer;
}
-
+
+ public static int incrementDroppedMessages()
+ {
+ return droppedMessages.incrementAndGet();
+ }
+
+ private static void logDroppedMessages()
+ {
+ if (droppedMessages.get() > 0)
+ logger_.warn("Dropped " + droppedMessages + " messages in the last " + LOG_DROPPED_INTERVAL_IN_MS + "ms");
+ droppedMessages.set(0);
+ }
+
private class SocketThread extends Thread
{
private final ServerSocket server;