You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2015/12/24 14:03:58 UTC
[1/2] cassandra git commit: Revert "Add latency logging for dropped
messages"
Repository: cassandra
Updated Branches:
refs/heads/trunk bb25f5bdd -> c9ef25fd8
Revert "Add latency logging for dropped messages"
This reverts commit 3c8d87f4324e5ff8bf6b1c3652e9c5eacf03bc20.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd5c8bbc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd5c8bbc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd5c8bbc
Branch: refs/heads/trunk
Commit: bd5c8bbc04e017089743b27cce55635dac00b98e
Parents: bb25f5b
Author: Joshua McKenzie <jm...@apache.org>
Authored: Thu Dec 24 07:59:31 2015 -0500
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 24 07:59:31 2015 -0500
----------------------------------------------------------------------
.../cassandra/net/MessageDeliveryTask.java | 42 ++-----------------
.../apache/cassandra/service/StorageProxy.java | 44 ++------------------
2 files changed, 7 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd5c8bbc/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index bede3d8..818cfc6 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -18,13 +18,11 @@
package org.apache.cassandra.net;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.EnumSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.IMutation;
+
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.index.IndexNotAvailableException;
@@ -45,11 +43,10 @@ public class MessageDeliveryTask implements Runnable
public void run()
{
- long timeTaken = System.currentTimeMillis() - message.constructionTime.timestamp;
MessagingService.Verb verb = message.verb;
- if (MessagingService.DROPPABLE_VERBS.contains(verb)&& message.getTimeout() > timeTaken)
+ if (MessagingService.DROPPABLE_VERBS.contains(verb)
+ && System.currentTimeMillis() > message.constructionTime.timestamp + message.getTimeout())
{
- LogDroppedMessageDetails(timeTaken);
MessagingService.instance().incrementDroppedMessages(message);
return;
}
@@ -85,37 +82,6 @@ public class MessageDeliveryTask implements Runnable
Gossiper.instance.setLastProcessedMessageAt(message.constructionTime.timestamp);
}
- private void LogDroppedMessageDetails(long timeTaken)
- {
- logger.debug("MessageDeliveryTask ran after {} ms, allowed time was {} ms. Dropping message {}",
- timeTaken, message.getTimeout(), message.toString());
- // Print KS and CF if Payload is mutation or a list of mutations (sent due to schema announcements)
- IMutation mutation;
- if (message.payload instanceof IMutation)
- {
- mutation = (IMutation)message.payload;
- if (mutation != null)
- {
- logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", mutation.getKeyspaceName(), Arrays.toString(mutation.getColumnFamilyIds().toArray()));
- }
- }
- else if (message.payload instanceof Collection<?>)
- {
- Collection<?> payloadItems = (Collection<?>)message.payload;
- for (Object payloadItem : payloadItems)
- {
- if (payloadItem instanceof IMutation)
- {
- mutation = (IMutation)payloadItem;
- if (mutation != null)
- {
- logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", mutation.getKeyspaceName(), Arrays.toString(mutation.getColumnFamilyIds().toArray()));
- }
- }
- }
- }
- }
-
private void handleFailure(Throwable t)
{
if (message.doCallbackOnFailure())
@@ -129,4 +95,4 @@ public class MessageDeliveryTask implements Runnable
private static final EnumSet<MessagingService.Verb> GOSSIP_VERBS = EnumSet.of(MessagingService.Verb.GOSSIP_DIGEST_ACK,
MessagingService.Verb.GOSSIP_DIGEST_ACK2,
MessagingService.Verb.GOSSIP_DIGEST_SYN);
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd5c8bbc/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1c30cd7..f161607 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1198,7 +1198,7 @@ public class StorageProxy implements StorageProxyMBean
submitHint(mutation, endpointsToHint, responseHandler);
if (insertLocal)
- performLocally(stage, mutation, mutation::apply, responseHandler);
+ performLocally(stage, mutation::apply, responseHandler);
if (dcGroups != null)
{
@@ -1286,27 +1286,6 @@ public class StorageProxy implements StorageProxyMBean
});
}
- private static void performLocally(Stage stage, IMutation mutation, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
- {
- StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(mutation)
- {
- public void runMayThrow()
- {
- try
- {
- runnable.run();
- handler.response(null);
- }
- catch (Exception ex)
- {
- if (!(ex instanceof WriteTimeoutException))
- logger.error("Failed to apply mutation locally : {}", ex);
- handler.onFailure(FBUtilities.getBroadcastAddress());
- }
- }
- });
- }
-
/**
* Handle counter mutation on the coordinator host.
*
@@ -2429,28 +2408,11 @@ public class StorageProxy implements StorageProxyMBean
private static abstract class LocalMutationRunnable implements Runnable
{
private final long constructionTime = System.currentTimeMillis();
- private IMutation mutation;
-
- public LocalMutationRunnable(IMutation mutation)
- {
- this.mutation = mutation;
- }
-
- public LocalMutationRunnable()
- {
- }
public final void run()
{
- long mutationTimeout = DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION);
- if (System.currentTimeMillis() > constructionTime + mutationTimeout)
+ if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
{
- long timeTaken = System.currentTimeMillis() - constructionTime;
- logger.debug("LocalMutationRunnable thread ran after {} ms, allowed time was {} ms. ", timeTaken, mutationTimeout);
- if (this.mutation != null)
- {
- logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", this.mutation.getKeyspaceName(), Arrays.toString(this.mutation.getColumnFamilyIds().toArray()));
- }
MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
{
@@ -2634,4 +2596,4 @@ public class StorageProxy implements StorageProxyMBean
public long getReadRepairRepairedBackground() {
return ReadRepairMetrics.repairedBackground.getCount();
}
-}
\ No newline at end of file
+}
[2/2] cassandra git commit: Add latency logging for dropped messages
Posted by jm...@apache.org.
Add latency logging for dropped messages
Patch by akale; reviewed by pmotta for CASSANDRA-10580
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9ef25fd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9ef25fd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9ef25fd
Branch: refs/heads/trunk
Commit: c9ef25fd81501005b6484baf064081efc557f3f4
Parents: bd5c8bb
Author: anubhavkale <an...@microsoft.com>
Authored: Tue Dec 15 21:39:16 2015 -0800
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Thu Dec 24 08:01:53 2015 -0500
----------------------------------------------------------------------
.../cassandra/db/ReadCommandVerbHandler.java | 2 +-
.../metrics/DroppedMessageMetrics.java | 10 ++++++
.../cassandra/net/MessageDeliveryTask.java | 5 +--
.../apache/cassandra/net/MessagingService.java | 37 ++++++++++++++++----
.../apache/cassandra/service/StorageProxy.java | 13 ++++---
.../cassandra/net/MessagingServiceTest.java | 10 +++---
6 files changed, 57 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 9eaa8fa..b2fb876 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -53,7 +53,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
if (!command.complete())
{
Tracing.trace("Discarding partial response to {} (timed out)", message.from);
- MessagingService.instance().incrementDroppedMessages(message);
+ MessagingService.instance().incrementDroppedMessages(message, System.currentTimeMillis() - message.constructionTime.timestamp);
return;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
index 58c80fb..2a94c9f 100644
--- a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.metrics;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+
import org.apache.cassandra.net.MessagingService;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -30,9 +32,17 @@ public class DroppedMessageMetrics
/** Number of dropped messages */
public final Meter dropped;
+ /** The dropped latency within node */
+ public final Timer internalDroppedLatency;
+
+ /** The cross node dropped latency */
+ public final Timer crossNodeDroppedLatency;
+
public DroppedMessageMetrics(MessagingService.Verb verb)
{
MetricNameFactory factory = new DefaultNameFactory("DroppedMessage", verb.toString());
dropped = Metrics.meter(factory.createMetricName("Dropped"));
+ internalDroppedLatency = Metrics.timer(factory.createMetricName("InternalDroppedLatency"));
+ crossNodeDroppedLatency = Metrics.timer(factory.createMetricName("CrossNodeDroppedLatency"));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 818cfc6..d9f8b7c 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -44,10 +44,11 @@ public class MessageDeliveryTask implements Runnable
public void run()
{
MessagingService.Verb verb = message.verb;
+ long timeTaken = System.currentTimeMillis() - message.constructionTime.timestamp;
if (MessagingService.DROPPABLE_VERBS.contains(verb)
- && System.currentTimeMillis() > message.constructionTime.timestamp + message.getTimeout())
+ && timeTaken > message.getTimeout())
{
- MessagingService.instance().incrementDroppedMessages(message);
+ MessagingService.instance().incrementDroppedMessages(message, timeTaken);
return;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index fab082a..d95c49b 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -957,9 +957,20 @@ public final class MessagingService implements MessagingServiceMBean
incrementDroppedMessages(verb, false);
}
- public void incrementDroppedMessages(MessageIn message)
+ public void incrementDroppedMessages(Verb verb, long timeTaken)
{
- incrementDroppedMessages(message.verb, message.constructionTime.isCrossNode);
+ incrementDroppedMessages(verb, timeTaken, false);
+ }
+
+ public void incrementDroppedMessages(MessageIn message, long timeTaken)
+ {
+ incrementDroppedMessages(message.verb, timeTaken, message.constructionTime.isCrossNode);
+ }
+
+ public void incrementDroppedMessages(Verb verb, long timeTaken, boolean isCrossNodeTimeout)
+ {
+ assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
+ incrementDroppedMessages(droppedMessagesMap.get(verb), timeTaken, isCrossNodeTimeout);
}
public void incrementDroppedMessages(Verb verb, boolean isCrossNodeTimeout)
@@ -968,6 +979,15 @@ public final class MessagingService implements MessagingServiceMBean
incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout);
}
+ private void incrementDroppedMessages(DroppedMessages droppedMessages, long timeTaken, boolean isCrossNodeTimeout)
+ {
+ if (isCrossNodeTimeout)
+ droppedMessages.metrics.crossNodeDroppedLatency.update(timeTaken, TimeUnit.MILLISECONDS);
+ else
+ droppedMessages.metrics.internalDroppedLatency.update(timeTaken, TimeUnit.MILLISECONDS);
+ incrementDroppedMessages(droppedMessages, isCrossNodeTimeout);
+ }
+
private void incrementDroppedMessages(DroppedMessages droppedMessages, boolean isCrossNodeTimeout)
{
droppedMessages.metrics.dropped.mark();
@@ -1000,11 +1020,14 @@ public final class MessagingService implements MessagingServiceMBean
int droppedCrossNodeTimeout = droppedMessages.droppedCrossNodeTimeout.getAndSet(0);
if (droppedInternalTimeout > 0 || droppedCrossNodeTimeout > 0)
{
- ret.add(String.format("%s messages were dropped in last %d ms: %d for internal timeout and %d for cross node timeout",
- verb,
- LOG_DROPPED_INTERVAL_IN_MS,
- droppedInternalTimeout,
- droppedCrossNodeTimeout));
+ ret.add(String.format("%s messages were dropped in last %d ms: %d for internal timeout and %d for cross node timeout."
+ + " Mean internal dropped latency: %d ms and Mean cross-node dropped latency: %d ms",
+ verb,
+ LOG_DROPPED_INTERVAL_IN_MS,
+ droppedInternalTimeout,
+ droppedCrossNodeTimeout,
+ TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.internalDroppedLatency.getSnapshot().getMean()),
+ TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.crossNodeDroppedLatency.getSnapshot().getMean())));
}
}
return ret;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f161607..2e32f16 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1732,7 +1732,7 @@ public class StorageProxy implements StorageProxyMBean
}
else
{
- MessagingService.instance().incrementDroppedMessages(verb);
+ MessagingService.instance().incrementDroppedMessages(verb, System.currentTimeMillis() - constructionTime);
handler.onFailure(FBUtilities.getBroadcastAddress());
}
@@ -2383,9 +2383,10 @@ public class StorageProxy implements StorageProxyMBean
public final void run()
{
- if (System.currentTimeMillis() > constructionTime + timeout)
+ long timeTaken = System.currentTimeMillis() - constructionTime;
+ if (timeTaken > timeout)
{
- MessagingService.instance().incrementDroppedMessages(verb);
+ MessagingService.instance().incrementDroppedMessages(verb, timeTaken);
return;
}
try
@@ -2411,9 +2412,11 @@ public class StorageProxy implements StorageProxyMBean
public final void run()
{
- if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
+ long mutationTimeout = DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION);
+ long timeTaken = System.currentTimeMillis() - constructionTime;
+ if (timeTaken > mutationTimeout)
{
- MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
+ MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION, timeTaken);
HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
{
protected void runMayThrow() throws Exception
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9ef25fd/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 75c146e..3b9c957 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -15,22 +15,22 @@ public class MessagingServiceTest
{
MessagingService.Verb verb = MessagingService.Verb.READ;
- for (int i = 0; i < 5000; i++)
- messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+ for (int i = 1; i <= 5000; i++)
+ messagingService.incrementDroppedMessages(verb, i, i % 2 == 0);
List<String> logs = messagingService.getDroppedMessagesLogs();
assertEquals(1, logs.size());
- assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout and 2500 for cross node timeout", logs.get(0));
+ assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout and 2500 for cross node timeout. Mean internal dropped latency: 2730 ms and Mean cross-node dropped latency: 2731 ms", logs.get(0));
assertEquals(5000, (int)messagingService.getDroppedMessages().get(verb.toString()));
logs = messagingService.getDroppedMessagesLogs();
assertEquals(0, logs.size());
for (int i = 0; i < 2500; i++)
- messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+ messagingService.incrementDroppedMessages(verb, i, i % 2 == 0);
logs = messagingService.getDroppedMessagesLogs();
- assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout and 1250 for cross node timeout", logs.get(0));
+ assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout and 1250 for cross node timeout. Mean internal dropped latency: 2277 ms and Mean cross-node dropped latency: 2278 ms", logs.get(0));
assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString()));
}