You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2015/07/24 22:51:44 UTC
[03/15] cassandra git commit: Log when messages are dropped due to
cross_node_timeout
Log when messages are dropped due to cross_node_timeout
Patch by Stefania, reviewed by brandonwilliams for CASSANDRA-9793
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/573a1d11
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/573a1d11
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/573a1d11
Branch: refs/heads/cassandra-2.1
Commit: 573a1d115b86abbe3fb53ff930464d7d8fd95600
Parents: 704ca66
Author: Brandon Williams <br...@apache.org>
Authored: Fri Jul 24 15:41:31 2015 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Fri Jul 24 15:41:31 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/net/IncomingTcpConnection.java | 9 +-
.../cassandra/net/MessageDeliveryTask.java | 10 +-
.../apache/cassandra/net/MessagingService.java | 153 +++++++++++++------
.../cassandra/net/MessagingServiceTest.java | 39 +++++
5 files changed, 157 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/573a1d11/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f1e855e..12af151 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.17
+ * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)
* checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765)
* Complete CASSANDRA-8448 fix (CASSANDRA-9519)
* Don't include auth credentials in debug log (CASSANDRA-9682)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/573a1d11/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 4817c75..16ca121 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -158,10 +158,15 @@ public class IncomingTcpConnection extends Thread implements Closeable
id = input.readInt();
long timestamp = System.currentTimeMillis();
+ boolean isCrossNodeTimestamp = false;
// make sure to readInt, even if cross_node_to is not enabled
int partial = input.readInt();
if (DatabaseDescriptor.hasCrossNodeTimeout())
- timestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
+ {
+ long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
+ isCrossNodeTimestamp = (timestamp != crossNodeTimestamp);
+ timestamp = crossNodeTimestamp;
+ }
MessageIn message = MessageIn.read(input, version, id);
if (message == null)
@@ -171,7 +176,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
}
if (version <= MessagingService.current_version)
{
- MessagingService.instance().receive(message, id, timestamp);
+ MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/573a1d11/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 982f17e..06caf94 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -29,15 +29,17 @@ public class MessageDeliveryTask implements Runnable
private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryTask.class);
private final MessageIn message;
- private final long constructionTime;
private final int id;
+ private final long constructionTime;
+ private final boolean isCrossNodeTimestamp;
- public MessageDeliveryTask(MessageIn message, int id, long timestamp)
+ public MessageDeliveryTask(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp)
{
assert message != null;
this.message = message;
this.id = id;
- constructionTime = timestamp;
+ this.constructionTime = timestamp;
+ this.isCrossNodeTimestamp = isCrossNodeTimestamp;
}
public void run()
@@ -46,7 +48,7 @@ public class MessageDeliveryTask implements Runnable
if (MessagingService.DROPPABLE_VERBS.contains(verb)
&& System.currentTimeMillis() > constructionTime + message.getTimeout())
{
- MessagingService.instance().incrementDroppedMessages(verb);
+ MessagingService.instance().incrementDroppedMessages(verb, isCrossNodeTimestamp);
return;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/573a1d11/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ee6b87b..b02680d 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -118,7 +119,7 @@ public final class MessagingService implements MessagingServiceMBean
SNAPSHOT, // Similar to nt snapshot
MIGRATION_REQUEST,
GOSSIP_SHUTDOWN,
- _TRACE, // dummy verb so we can use MS.droppedMessages
+ _TRACE, // dummy verb so we can use MS.droppedMessagesMap
ECHO,
REPAIR_MESSAGE,
// use as padding for backwards compatability where a previous version needs to validate a verb from the future.
@@ -291,10 +292,23 @@ public final class MessagingService implements MessagingServiceMBean
Verb.PAGED_RANGE,
Verb.REQUEST_RESPONSE);
+
+ private static final class DroppedMessages
+ {
+ final DroppedMessageMetrics metrics;
+ final AtomicInteger droppedInternalTimeout;
+ final AtomicInteger droppedCrossNodeTimeout;
+
+ DroppedMessages(Verb verb)
+ {
+ this.metrics = new DroppedMessageMetrics(verb);
+ this.droppedInternalTimeout = new AtomicInteger(0);
+ this.droppedCrossNodeTimeout = new AtomicInteger(0);
+ }
+
+ }
// total dropped message counts for server lifetime
- private final Map<Verb, DroppedMessageMetrics> droppedMessages = new EnumMap<Verb, DroppedMessageMetrics>(Verb.class);
- // dropped count when last requested for the Recent api. high concurrency isn't necessary here.
- private final Map<Verb, Integer> lastDroppedInternal = new EnumMap<Verb, Integer>(Verb.class);
+ private final Map<Verb, DroppedMessages> droppedMessagesMap = new EnumMap<>(Verb.class);
private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
@@ -303,31 +317,43 @@ public final class MessagingService implements MessagingServiceMBean
private static class MSHandle
{
- public static final MessagingService instance = new MessagingService();
+ public static final MessagingService instance = new MessagingService(false);
}
+
public static MessagingService instance()
{
return MSHandle.instance;
}
- private MessagingService()
+ private static class MSTestHandle
+ {
+ public static final MessagingService instance = new MessagingService(true);
+ }
+
+ static MessagingService test()
+ {
+ return MSTestHandle.instance;
+ }
+
+ private MessagingService(boolean testOnly)
{
for (Verb verb : DROPPABLE_VERBS)
- {
- droppedMessages.put(verb, new DroppedMessageMetrics(verb));
- lastDroppedInternal.put(verb, 0);
- }
+ droppedMessagesMap.put(verb, new DroppedMessages(verb));
listenGate = new SimpleCondition();
- verbHandlers = new EnumMap<Verb, IVerbHandler>(Verb.class);
- Runnable logDropped = new Runnable()
+ verbHandlers = new EnumMap<>(Verb.class);
+
+ if (!testOnly)
{
- public void run()
+ Runnable logDropped = new Runnable()
{
- logDroppedMessages();
- }
- };
- StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+ public void run()
+ {
+ logDroppedMessages();
+ }
+ };
+ StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+ }
Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, ?> timeoutReporter = new Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, Object>()
{
@@ -357,16 +383,19 @@ public final class MessagingService implements MessagingServiceMBean
}
};
- callbacks = new ExpiringMap<Integer, CallbackInfo>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter);
+ callbacks = new ExpiringMap<>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter);
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
+ if (!testOnly)
{
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
@@ -718,7 +747,7 @@ public final class MessagingService implements MessagingServiceMBean
}
}
- public void receive(MessageIn message, int id, long timestamp)
+ public void receive(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp)
{
TraceState state = Tracing.instance.initializeFromMessage(message);
if (state != null)
@@ -732,7 +761,7 @@ public final class MessagingService implements MessagingServiceMBean
return;
}
- Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
+ Runnable runnable = new MessageDeliveryTask(message, id, timestamp, isCrossNodeTimestamp);
TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
assert stage != null : "No stage for message type " + message.verb;
@@ -844,8 +873,13 @@ public final class MessagingService implements MessagingServiceMBean
public void incrementDroppedMessages(Verb verb)
{
+ incrementDroppedMessages(verb, false);
+ }
+
+ public void incrementDroppedMessages(Verb verb, boolean isCrossNodeTimeout)
+ {
assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
- droppedMessages.get(verb).dropped.mark();
+ incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout);
}
/**
@@ -853,34 +887,55 @@ public final class MessagingService implements MessagingServiceMBean
*/
private void incrementRejectedMessages(Verb verb)
{
- DroppedMessageMetrics metrics = droppedMessages.get(verb);
- if (metrics == null)
+ DroppedMessages droppedMessages = droppedMessagesMap.get(verb);
+ if (droppedMessages == null)
{
- metrics = new DroppedMessageMetrics(verb);
- droppedMessages.put(verb, metrics);
+ droppedMessages = new DroppedMessages(verb);
+ droppedMessagesMap.put(verb, droppedMessages);
}
- metrics.dropped.mark();
+ incrementDroppedMessages(droppedMessagesMap.get(verb), false);
+ }
+
+ private void incrementDroppedMessages(DroppedMessages droppedMessages, boolean isCrossNodeTimeout)
+ {
+ droppedMessages.metrics.dropped.mark();
+ if (isCrossNodeTimeout)
+ droppedMessages.droppedCrossNodeTimeout.incrementAndGet();
+ else
+ droppedMessages.droppedInternalTimeout.incrementAndGet();
}
private void logDroppedMessages()
{
- boolean logTpstats = false;
- for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
+ List<String> logs = getDroppedMessagesLogs();
+ for (String log : logs)
+ logger.error(log);
+
+ if (logs.size() > 0)
+ StatusLogger.log();
+ }
+
+ @VisibleForTesting
+ List<String> getDroppedMessagesLogs()
+ {
+ List<String> ret = new ArrayList<>();
+ for (Map.Entry<Verb, DroppedMessages> entry : droppedMessagesMap.entrySet())
{
- int dropped = (int) entry.getValue().dropped.count();
Verb verb = entry.getKey();
- int recent = dropped - lastDroppedInternal.get(verb);
- if (recent > 0)
+ DroppedMessages droppedMessages = entry.getValue();
+
+ int droppedInternalTimeout = droppedMessages.droppedInternalTimeout.getAndSet(0);
+ int droppedCrossNodeTimeout = droppedMessages.droppedCrossNodeTimeout.getAndSet(0);
+ if (droppedInternalTimeout > 0 || droppedCrossNodeTimeout > 0)
{
- logTpstats = true;
- logger.info("{} {} messages dropped in last {}ms",
- new Object[] {recent, verb, LOG_DROPPED_INTERVAL_IN_MS});
- lastDroppedInternal.put(verb, dropped);
+ ret.add(String.format("%s messages were dropped in last %d ms: %d for internal timeout and %d for cross node timeout",
+ verb,
+ LOG_DROPPED_INTERVAL_IN_MS,
+ droppedInternalTimeout,
+ droppedCrossNodeTimeout));
}
}
-
- if (logTpstats)
- StatusLogger.log();
+ return ret;
}
private static class SocketThread extends Thread
@@ -1011,16 +1066,16 @@ public final class MessagingService implements MessagingServiceMBean
public Map<String, Integer> getDroppedMessages()
{
Map<String, Integer> map = new HashMap<String, Integer>();
- for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
- map.put(entry.getKey().toString(), (int) entry.getValue().dropped.count());
+ for (Map.Entry<Verb, DroppedMessages> entry : droppedMessagesMap.entrySet())
+ map.put(entry.getKey().toString(), (int) entry.getValue().metrics.dropped.count());
return map;
}
public Map<String, Integer> getRecentlyDroppedMessages()
{
Map<String, Integer> map = new HashMap<String, Integer>();
- for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
- map.put(entry.getKey().toString(), entry.getValue().getRecentlyDropped());
+ for (Map.Entry<Verb, DroppedMessages> entry : droppedMessagesMap.entrySet())
+ map.put(entry.getKey().toString(), entry.getValue().metrics.getRecentlyDropped());
return map;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/573a1d11/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
new file mode 100644
index 0000000..04dacf3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -0,0 +1,39 @@
+package org.apache.cassandra.net;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MessagingServiceTest
+{
+ private final MessagingService messagingService = MessagingService.test();
+
+ @Test
+ public void testDroppedMessages()
+ {
+ MessagingService.Verb verb = MessagingService.Verb.READ;
+
+ for (int i = 0; i < 5000; i++)
+ messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+
+ List<String> logs = messagingService.getDroppedMessagesLogs();
+ assertEquals(1, logs.size());
+ assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout and 2500 for cross node timeout", logs.get(0));
+ assertEquals(5000, (int)messagingService.getDroppedMessages().get(verb.toString()));
+ assertEquals(5000, (int) messagingService.getRecentlyDroppedMessages().get(verb.toString()));
+
+ logs = messagingService.getDroppedMessagesLogs();
+ assertEquals(0, logs.size());
+
+ for (int i = 0; i < 2500; i++)
+ messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+
+ logs = messagingService.getDroppedMessagesLogs();
+ assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout and 1250 for cross node timeout", logs.get(0));
+ assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString()));
+ assertEquals(2500, (int) messagingService.getRecentlyDroppedMessages().get(verb.toString()));
+ }
+
+}