You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by di...@apache.org on 2016/12/06 23:05:54 UTC
[1/3] cassandra git commit: Expose time spent waiting in thread pool
queue
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.X f1423806e -> 6b6bc6a36
refs/heads/trunk 48591489d -> 3ea0579d8
Expose time spent waiting in thread pool queue
Patch by Dikang Gu; reviewed by T Jake Luciani for CASSANDRA-8398
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b6bc6a3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b6bc6a3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b6bc6a3
Branch: refs/heads/cassandra-3.X
Commit: 6b6bc6a36c623b8074f0fb27c656b8c26c27cd7e
Parents: f142380
Author: Dikang Gu <di...@gmail.com>
Authored: Wed Nov 30 13:00:49 2016 -0800
Committer: Dikang Gu <di...@gmail.com>
Committed: Tue Dec 6 15:02:31 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/metrics/MessagingMetrics.java | 16 ++++++++++
.../cassandra/net/MessageDeliveryTask.java | 6 ++++
.../org/apache/cassandra/tools/NodeProbe.java | 14 +++++++++
.../tools/nodetool/stats/TpStatsHolder.java | 13 ++++++++
.../tools/nodetool/stats/TpStatsPrinter.java | 17 +++++++++--
.../cassandra/net/MessagingServiceTest.java | 31 ++++++++++++++++++++
7 files changed, 96 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bddd823..8b2bed7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.12
+ * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
* Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
* NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
* cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
index e126c93..5f640b9 100644
--- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
@@ -38,11 +38,13 @@ public class MessagingMetrics
private static final MetricNameFactory factory = new DefaultNameFactory("Messaging");
public final Timer crossNodeLatency;
public final ConcurrentHashMap<String, Timer> dcLatency;
+ public final ConcurrentHashMap<String, Timer> queueWaitLatency;
public MessagingMetrics()
{
crossNodeLatency = Metrics.timer(factory.createMetricName("CrossNodeLatency"));
dcLatency = new ConcurrentHashMap<>();
+ queueWaitLatency = new ConcurrentHashMap<>();
}
public void addTimeTaken(InetAddress from, long timeTaken)
@@ -56,4 +58,18 @@ public class MessagingMetrics
timer.update(timeTaken, TimeUnit.MILLISECONDS);
crossNodeLatency.update(timeTaken, TimeUnit.MILLISECONDS);
}
+
+ public void addQueueWaitTime(String verb, long timeTaken)
+ {
+ if (timeTaken < 0)
+ // the measurement is not accurate, ignore the negative timeTaken
+ return;
+
+ Timer timer = queueWaitLatency.get(verb);
+ if (timer == null)
+ {
+ timer = queueWaitLatency.computeIfAbsent(verb, k -> Metrics.timer(factory.createMetricName(verb + "-WaitLatency")));
+ }
+ timer.update(timeTaken, TimeUnit.MILLISECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index c91e9da..c7fc991 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
+import org.apache.cassandra.db.monitoring.ApproximateTime;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.index.IndexNotAvailableException;
@@ -35,17 +36,22 @@ public class MessageDeliveryTask implements Runnable
private final MessageIn message;
private final int id;
+ private final long enqueueTime;
public MessageDeliveryTask(MessageIn message, int id)
{
assert message != null;
this.message = message;
this.id = id;
+ this.enqueueTime = ApproximateTime.currentTimeMillis();
}
public void run()
{
MessagingService.Verb verb = message.verb;
+ MessagingService.instance().metrics.addQueueWaitTime(verb.toString(),
+ ApproximateTime.currentTimeMillis() - enqueueTime);
+
long timeTaken = message.getLifetimeInMS();
if (MessagingService.DROPPABLE_VERBS.contains(verb)
&& timeTaken > message.getTimeout())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 8c01891..a48baf8 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1354,6 +1354,20 @@ public class NodeProbe implements AutoCloseable
}
}
+ public CassandraMetricsRegistry.JmxTimerMBean getMessagingQueueWaitMetrics(String verb)
+ {
+ try
+ {
+ return JMX.newMBeanProxy(mbeanServerConn,
+ new ObjectName("org.apache.cassandra.metrics:name=" + verb + "-WaitLatency,type=Messaging"),
+ CassandraMetricsRegistry.JmxTimerMBean.class);
+ }
+ catch (MalformedObjectNameException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Retrieve Proxy metrics
* @param metricName CompletedTasks, PendingTasks, BytesCompacted or TotalCompactionsCompleted.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
index d70b4dd..f3e91dc 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
@@ -39,6 +39,7 @@ public class TpStatsHolder implements StatsHolder
HashMap<String, Object> result = new HashMap<>();
HashMap<String, Map<String, Object>> threadPools = new HashMap<>();
HashMap<String, Object> droppedMessage = new HashMap<>();
+ HashMap<String, double[]> waitLatencies = new HashMap<>();
for (Map.Entry<String, String> tp : probe.getThreadPools().entries())
{
@@ -53,8 +54,20 @@ public class TpStatsHolder implements StatsHolder
result.put("ThreadPools", threadPools);
for (Map.Entry<String, Integer> entry : probe.getDroppedMessages().entrySet())
+ {
droppedMessage.put(entry.getKey(), entry.getValue());
+ try
+ {
+ waitLatencies.put(entry.getKey(), probe.metricPercentilesAsArray(probe.getMessagingQueueWaitMetrics(entry.getKey())));
+ }
+ catch (RuntimeException e)
+ {
+ // ignore the exceptions when fetching metrics
+ }
+ }
+
result.put("DroppedMessage", droppedMessage);
+ result.put("WaitLatencies", waitLatencies);
return result;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
index b874746..45fc5ee 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
@@ -61,12 +61,25 @@ public class TpStatsPrinter
values.get("TotalBlockedTasks"));
}
- out.printf("%n%-20s%10s%n", "Message type", "Dropped");
+ out.printf("%n%-20s%10s%18s%18s%18s%18s%n", "Message type", "Dropped", "", "Latency waiting in queue (micros)", "", "");
+ out.printf("%-20s%10s%18s%18s%18s%18s%n", "", "", "50%", "95%", "99%", "Max");
Map<Object, Object> droppedMessages = convertData.get("DroppedMessage") instanceof Map<?, ?> ? (Map)convertData.get("DroppedMessage") : Collections.emptyMap();
+ Map<Object, double[]> waitLatencies = convertData.get("WaitLatencies") instanceof Map<?, ?> ? (Map)convertData.get("WaitLatencies") : Collections.emptyMap();
for (Map.Entry<Object, Object> entry : droppedMessages.entrySet())
{
- out.printf("%-20s%10s%n", entry.getKey(), entry.getValue());
+ out.printf("%-20s%10s", entry.getKey(), entry.getValue());
+ if (waitLatencies.containsKey(entry.getKey()))
+ {
+ double[] latencies = waitLatencies.get(entry.getKey());
+ out.printf("%18.2f%18.2f%18.2f%18.2f", latencies[0], latencies[2], latencies[4], latencies[6]);
+ }
+ else
+ {
+ out.printf("%18s%18s%18s%18s", "N/A", "N/A", "N/A", "N/A");
+ }
+
+ out.printf("%n");
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 11d17b8..8f8e97e 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -129,6 +129,37 @@ public class MessagingServiceTest
}
@Test
+ public void testQueueWaitLatency() throws Exception
+ {
+ int latency = 100;
+ String verb = MessagingService.Verb.MUTATION.toString();
+
+ ConcurrentHashMap<String, Timer> queueWaitLatency = MessagingService.instance().metrics.queueWaitLatency;
+ queueWaitLatency.clear();
+
+ assertNull(queueWaitLatency.get(verb));
+ MessagingService.instance().metrics.addQueueWaitTime(verb, latency);
+ assertNotNull(queueWaitLatency.get(verb));
+ assertEquals(1, queueWaitLatency.get(verb).getCount());
+ long expectedBucket = bucketOffsets[Math.abs(Arrays.binarySearch(bucketOffsets, TimeUnit.MILLISECONDS.toNanos(latency))) - 1];
+ assertEquals(expectedBucket, queueWaitLatency.get(verb).getSnapshot().getMax());
+ }
+
+ @Test
+ public void testNegativeQueueWaitLatency() throws Exception
+ {
+ int latency = -100;
+ String verb = MessagingService.Verb.MUTATION.toString();
+
+ ConcurrentHashMap<String, Timer> queueWaitLatency = MessagingService.instance().metrics.queueWaitLatency;
+ queueWaitLatency.clear();
+
+ assertNull(queueWaitLatency.get(verb));
+ MessagingService.instance().metrics.addQueueWaitTime(verb, latency);
+ assertNull(queueWaitLatency.get(verb));
+ }
+
+ @Test
public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException
{
MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
[2/3] cassandra git commit: Expose time spent waiting in thread pool
queue
Posted by di...@apache.org.
Expose time spent waiting in thread pool queue
Patch by Dikang Gu; reviewed by T Jake Luciani for CASSANDRA-8398
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b6bc6a3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b6bc6a3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b6bc6a3
Branch: refs/heads/trunk
Commit: 6b6bc6a36c623b8074f0fb27c656b8c26c27cd7e
Parents: f142380
Author: Dikang Gu <di...@gmail.com>
Authored: Wed Nov 30 13:00:49 2016 -0800
Committer: Dikang Gu <di...@gmail.com>
Committed: Tue Dec 6 15:02:31 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/metrics/MessagingMetrics.java | 16 ++++++++++
.../cassandra/net/MessageDeliveryTask.java | 6 ++++
.../org/apache/cassandra/tools/NodeProbe.java | 14 +++++++++
.../tools/nodetool/stats/TpStatsHolder.java | 13 ++++++++
.../tools/nodetool/stats/TpStatsPrinter.java | 17 +++++++++--
.../cassandra/net/MessagingServiceTest.java | 31 ++++++++++++++++++++
7 files changed, 96 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bddd823..8b2bed7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.12
+ * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
* Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
* NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
* cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
index e126c93..5f640b9 100644
--- a/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/MessagingMetrics.java
@@ -38,11 +38,13 @@ public class MessagingMetrics
private static final MetricNameFactory factory = new DefaultNameFactory("Messaging");
public final Timer crossNodeLatency;
public final ConcurrentHashMap<String, Timer> dcLatency;
+ public final ConcurrentHashMap<String, Timer> queueWaitLatency;
public MessagingMetrics()
{
crossNodeLatency = Metrics.timer(factory.createMetricName("CrossNodeLatency"));
dcLatency = new ConcurrentHashMap<>();
+ queueWaitLatency = new ConcurrentHashMap<>();
}
public void addTimeTaken(InetAddress from, long timeTaken)
@@ -56,4 +58,18 @@ public class MessagingMetrics
timer.update(timeTaken, TimeUnit.MILLISECONDS);
crossNodeLatency.update(timeTaken, TimeUnit.MILLISECONDS);
}
+
+ public void addQueueWaitTime(String verb, long timeTaken)
+ {
+ if (timeTaken < 0)
+ // the measurement is not accurate, ignore the negative timeTaken
+ return;
+
+ Timer timer = queueWaitLatency.get(verb);
+ if (timer == null)
+ {
+ timer = queueWaitLatency.computeIfAbsent(verb, k -> Metrics.timer(factory.createMetricName(verb + "-WaitLatency")));
+ }
+ timer.update(timeTaken, TimeUnit.MILLISECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index c91e9da..c7fc991 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
+import org.apache.cassandra.db.monitoring.ApproximateTime;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.index.IndexNotAvailableException;
@@ -35,17 +36,22 @@ public class MessageDeliveryTask implements Runnable
private final MessageIn message;
private final int id;
+ private final long enqueueTime;
public MessageDeliveryTask(MessageIn message, int id)
{
assert message != null;
this.message = message;
this.id = id;
+ this.enqueueTime = ApproximateTime.currentTimeMillis();
}
public void run()
{
MessagingService.Verb verb = message.verb;
+ MessagingService.instance().metrics.addQueueWaitTime(verb.toString(),
+ ApproximateTime.currentTimeMillis() - enqueueTime);
+
long timeTaken = message.getLifetimeInMS();
if (MessagingService.DROPPABLE_VERBS.contains(verb)
&& timeTaken > message.getTimeout())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 8c01891..a48baf8 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1354,6 +1354,20 @@ public class NodeProbe implements AutoCloseable
}
}
+ public CassandraMetricsRegistry.JmxTimerMBean getMessagingQueueWaitMetrics(String verb)
+ {
+ try
+ {
+ return JMX.newMBeanProxy(mbeanServerConn,
+ new ObjectName("org.apache.cassandra.metrics:name=" + verb + "-WaitLatency,type=Messaging"),
+ CassandraMetricsRegistry.JmxTimerMBean.class);
+ }
+ catch (MalformedObjectNameException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Retrieve Proxy metrics
* @param metricName CompletedTasks, PendingTasks, BytesCompacted or TotalCompactionsCompleted.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
index d70b4dd..f3e91dc 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsHolder.java
@@ -39,6 +39,7 @@ public class TpStatsHolder implements StatsHolder
HashMap<String, Object> result = new HashMap<>();
HashMap<String, Map<String, Object>> threadPools = new HashMap<>();
HashMap<String, Object> droppedMessage = new HashMap<>();
+ HashMap<String, double[]> waitLatencies = new HashMap<>();
for (Map.Entry<String, String> tp : probe.getThreadPools().entries())
{
@@ -53,8 +54,20 @@ public class TpStatsHolder implements StatsHolder
result.put("ThreadPools", threadPools);
for (Map.Entry<String, Integer> entry : probe.getDroppedMessages().entrySet())
+ {
droppedMessage.put(entry.getKey(), entry.getValue());
+ try
+ {
+ waitLatencies.put(entry.getKey(), probe.metricPercentilesAsArray(probe.getMessagingQueueWaitMetrics(entry.getKey())));
+ }
+ catch (RuntimeException e)
+ {
+ // ignore the exceptions when fetching metrics
+ }
+ }
+
result.put("DroppedMessage", droppedMessage);
+ result.put("WaitLatencies", waitLatencies);
return result;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
index b874746..45fc5ee 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TpStatsPrinter.java
@@ -61,12 +61,25 @@ public class TpStatsPrinter
values.get("TotalBlockedTasks"));
}
- out.printf("%n%-20s%10s%n", "Message type", "Dropped");
+ out.printf("%n%-20s%10s%18s%18s%18s%18s%n", "Message type", "Dropped", "", "Latency waiting in queue (micros)", "", "");
+ out.printf("%-20s%10s%18s%18s%18s%18s%n", "", "", "50%", "95%", "99%", "Max");
Map<Object, Object> droppedMessages = convertData.get("DroppedMessage") instanceof Map<?, ?> ? (Map)convertData.get("DroppedMessage") : Collections.emptyMap();
+ Map<Object, double[]> waitLatencies = convertData.get("WaitLatencies") instanceof Map<?, ?> ? (Map)convertData.get("WaitLatencies") : Collections.emptyMap();
for (Map.Entry<Object, Object> entry : droppedMessages.entrySet())
{
- out.printf("%-20s%10s%n", entry.getKey(), entry.getValue());
+ out.printf("%-20s%10s", entry.getKey(), entry.getValue());
+ if (waitLatencies.containsKey(entry.getKey()))
+ {
+ double[] latencies = waitLatencies.get(entry.getKey());
+ out.printf("%18.2f%18.2f%18.2f%18.2f", latencies[0], latencies[2], latencies[4], latencies[6]);
+ }
+ else
+ {
+ out.printf("%18s%18s%18s%18s", "N/A", "N/A", "N/A", "N/A");
+ }
+
+ out.printf("%n");
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b6bc6a3/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 11d17b8..8f8e97e 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -129,6 +129,37 @@ public class MessagingServiceTest
}
@Test
+ public void testQueueWaitLatency() throws Exception
+ {
+ int latency = 100;
+ String verb = MessagingService.Verb.MUTATION.toString();
+
+ ConcurrentHashMap<String, Timer> queueWaitLatency = MessagingService.instance().metrics.queueWaitLatency;
+ queueWaitLatency.clear();
+
+ assertNull(queueWaitLatency.get(verb));
+ MessagingService.instance().metrics.addQueueWaitTime(verb, latency);
+ assertNotNull(queueWaitLatency.get(verb));
+ assertEquals(1, queueWaitLatency.get(verb).getCount());
+ long expectedBucket = bucketOffsets[Math.abs(Arrays.binarySearch(bucketOffsets, TimeUnit.MILLISECONDS.toNanos(latency))) - 1];
+ assertEquals(expectedBucket, queueWaitLatency.get(verb).getSnapshot().getMax());
+ }
+
+ @Test
+ public void testNegativeQueueWaitLatency() throws Exception
+ {
+ int latency = -100;
+ String verb = MessagingService.Verb.MUTATION.toString();
+
+ ConcurrentHashMap<String, Timer> queueWaitLatency = MessagingService.instance().metrics.queueWaitLatency;
+ queueWaitLatency.clear();
+
+ assertNull(queueWaitLatency.get(verb));
+ MessagingService.instance().metrics.addQueueWaitTime(verb, latency);
+ assertNull(queueWaitLatency.get(verb));
+ }
+
+ @Test
public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException
{
MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
[3/3] cassandra git commit: Merge branch 'cassandra-3.X' into trunk
Posted by di...@apache.org.
Merge branch 'cassandra-3.X' into trunk
* cassandra-3.X:
Expose time spent waiting in thread pool queue
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3ea0579d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3ea0579d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3ea0579d
Branch: refs/heads/trunk
Commit: 3ea0579d89d9a6297d4f94d97728892b5a953489
Parents: 4859148 6b6bc6a
Author: Dikang Gu <di...@gmail.com>
Authored: Tue Dec 6 15:03:39 2016 -0800
Committer: Dikang Gu <di...@gmail.com>
Committed: Tue Dec 6 15:04:24 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/metrics/MessagingMetrics.java | 16 ++++++++++
.../cassandra/net/MessageDeliveryTask.java | 6 ++++
.../org/apache/cassandra/tools/NodeProbe.java | 14 +++++++++
.../tools/nodetool/stats/TpStatsHolder.java | 13 ++++++++
.../tools/nodetool/stats/TpStatsPrinter.java | 17 +++++++++--
.../cassandra/net/MessagingServiceTest.java | 31 ++++++++++++++++++++
7 files changed, 96 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ea0579d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 40407bc,8b2bed7..07177a4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,13 -1,5 +1,14 @@@
+4.0
+ * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
+ * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
+ * Add (automate) Nodetool Documentation (CASSANDRA-12672)
+ * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
+ * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
+ * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
+
+
3.12
+ * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
* Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
* NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
* cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)