You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/08/13 14:39:58 UTC
cassandra git commit: Route gossip messages over dedicated socket
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 85d550f94 -> 9a6ee93f6
Route gossip messages over dedicated socket
patch by awiesberg; reviewed by jasobrown for CASSANDRA-9237
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a6ee93f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a6ee93f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a6ee93f
Branch: refs/heads/cassandra-2.2
Commit: 9a6ee93f63301d03ff1d20032e6d80f806a14d5d
Parents: 85d550f
Author: Jason Brown <ja...@gmail.com>
Authored: Thu Aug 13 05:38:45 2015 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu Aug 13 05:38:45 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/metrics/ConnectionMetrics.java | 30 ++++++++++++++++++++
.../org/apache/cassandra/net/MessageOut.java | 2 +-
.../apache/cassandra/net/MessagingService.java | 24 ++++++++++++++++
.../cassandra/net/MessagingServiceMBean.java | 15 ++++++++++
.../net/OutboundTcpConnectionPool.java | 14 +++++++--
.../cassandra/tools/nodetool/NetStats.java | 8 ++++++
7 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cff477b..3b548e5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.1
+ * Route gossip messages over dedicated socket (CASSANDRA-9237)
* Add checksum to saved cache files (CASSANDRA-9265)
* Log warning when using an aggregate without partition key (CASSANDRA-9737)
* Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
index 9661c48..f01c06d 100644
--- a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
@@ -50,6 +50,12 @@ public class ConnectionMetrics
public final Gauge<Long> smallMessageCompletedTasks;
/** Dropped tasks for small message TCP Connections */
public final Gauge<Long> smallMessageDroppedTasks;
+ /** Pending tasks for gossip message TCP Connections */
+ public final Gauge<Integer> gossipMessagePendingTasks;
+ /** Completed tasks for gossip message TCP Connections */
+ public final Gauge<Long> gossipMessageCompletedTasks;
+ /** Dropped tasks for gossip message TCP Connections */
+ public final Gauge<Long> gossipMessageDroppedTasks;
/** Number of timeouts for specific IP */
public final Meter timeouts;
@@ -111,6 +117,27 @@ public class ConnectionMetrics
return connectionPool.smallMessages.getDroppedMessages();
}
});
+ gossipMessagePendingTasks = Metrics.register(factory.createMetricName("GossipMessagePendingTasks"), new Gauge<Integer>()
+ {
+ public Integer getValue()
+ {
+ return connectionPool.gossipMessages.getPendingMessages();
+ }
+ });
+ gossipMessageCompletedTasks = Metrics.register(factory.createMetricName("GossipMessageCompletedTasks"), new Gauge<Long>()
+ {
+ public Long getValue()
+ {
+ return connectionPool.gossipMessages.getCompletedMesssages();
+ }
+ });
+ gossipMessageDroppedTasks = Metrics.register(factory.createMetricName("GossipMessageDroppedTasks"), new Gauge<Long>()
+ {
+ public Long getValue()
+ {
+ return connectionPool.gossipMessages.getDroppedMessages();
+ }
+ });
timeouts = Metrics.meter(factory.createMetricName("Timeouts"));
}
@@ -122,6 +149,9 @@ public class ConnectionMetrics
Metrics.remove(factory.createMetricName("SmallMessagePendingTasks"));
Metrics.remove(factory.createMetricName("SmallMessageCompletedTasks"));
Metrics.remove(factory.createMetricName("SmallMessageDroppedTasks"));
+ Metrics.remove(factory.createMetricName("GossipMessagePendingTasks"));
+ Metrics.remove(factory.createMetricName("GossipMessageCompletedTasks"));
+ Metrics.remove(factory.createMetricName("GossipMessageDroppedTasks"));
Metrics.remove(factory.createMetricName("Timeouts"));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 28038b3..1e291c2 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -88,7 +88,7 @@ public class MessageOut<T>
return new MessageOut<T>(verb, payload, serializer, builder.build());
}
- private Stage getStage()
+ public Stage getStage()
{
return MessagingService.verbStages.get(verb);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index f8fd6fd..944dced 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1066,6 +1066,30 @@ public final class MessagingService implements MessagingServiceMBean
return droppedTasks;
}
+ public Map<String, Integer> getGossipMessagePendingTasks()
+ {
+ Map<String, Integer> pendingTasks = new HashMap<String, Integer>(connectionManagers.size());
+ for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
+ pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getPendingMessages());
+ return pendingTasks;
+ }
+
+ public Map<String, Long> getGossipMessageCompletedTasks()
+ {
+ Map<String, Long> completedTasks = new HashMap<String, Long>(connectionManagers.size());
+ for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
+ completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getCompletedMesssages());
+ return completedTasks;
+ }
+
+ public Map<String, Long> getGossipMessageDroppedTasks()
+ {
+ Map<String, Long> droppedTasks = new HashMap<String, Long>(connectionManagers.size());
+ for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
+ droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getDroppedMessages());
+ return droppedTasks;
+ }
+
public Map<String, Integer> getDroppedMessages()
{
Map<String, Integer> map = new HashMap<>(droppedMessagesMap.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
index f1b418c..3bcb0d5 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
@@ -59,6 +59,21 @@ public interface MessagingServiceMBean
public Map<String, Long> getSmallMessageDroppedTasks();
/**
+ * Pending tasks for gossip message TCP Connections
+ */
+ public Map<String, Integer> getGossipMessagePendingTasks();
+
+ /**
+ * Completed tasks for gossip message TCP Connections
+ */
+ public Map<String, Long> getGossipMessageCompletedTasks();
+
+ /**
+ * Dropped tasks for gossip message TCP Connections
+ */
+ public Map<String, Long> getGossipMessageDroppedTasks();
+
+ /**
* dropped message counts for server lifetime
*/
public Map<String, Integer> getDroppedMessages();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index d388f29..0e6d2cc 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -44,6 +44,8 @@ public class OutboundTcpConnectionPool
private final CountDownLatch started;
public final OutboundTcpConnection smallMessages;
public final OutboundTcpConnection largeMessages;
+ public final OutboundTcpConnection gossipMessages;
+
// pointer to the reset Address.
private InetAddress resetEndpoint;
private ConnectionMetrics metrics;
@@ -56,6 +58,7 @@ public class OutboundTcpConnectionPool
smallMessages = new OutboundTcpConnection(this);
largeMessages = new OutboundTcpConnection(this);
+ gossipMessages = new OutboundTcpConnection(this);
}
/**
@@ -64,6 +67,8 @@ public class OutboundTcpConnectionPool
*/
OutboundTcpConnection getConnection(MessageOut msg)
{
+ if (Stage.GOSSIP == msg.getStage())
+ return gossipMessages;
return msg.payloadSize(smallMessages.getTargetVersion()) > LARGE_MESSAGE_THRESHOLD
? largeMessages
: smallMessages;
@@ -71,13 +76,13 @@ public class OutboundTcpConnectionPool
void reset()
{
- for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages })
+ for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages, gossipMessages })
conn.closeSocket(false);
}
public void resetToNewerVersion(int version)
{
- for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages })
+ for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages, gossipMessages })
{
if (version > conn.getTargetVersion())
conn.softCloseSocket();
@@ -93,7 +98,7 @@ public class OutboundTcpConnectionPool
{
SystemKeyspace.updatePreferredIP(id, remoteEP);
resetEndpoint = remoteEP;
- for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages })
+ for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages, gossipMessages })
conn.softCloseSocket();
// release previous metrics and create new one with reset address
@@ -170,6 +175,7 @@ public class OutboundTcpConnectionPool
{
smallMessages.start();
largeMessages.start();
+ gossipMessages.start();
metrics = new ConnectionMetrics(id, this);
@@ -203,6 +209,8 @@ public class OutboundTcpConnectionPool
largeMessages.closeSocket(true);
if (smallMessages != null)
smallMessages.closeSocket(true);
+ if (gossipMessages != null)
+ gossipMessages.closeSocket(true);
metrics.release();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
index cd2f1a2..3e06ca0 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
@@ -110,6 +110,14 @@ public class NetStats extends NodeToolCmd
for (long n : ms.getSmallMessageCompletedTasks().values())
completed += n;
System.out.printf("%-25s%10s%10s%15s%n", "Small messages", "n/a", pending, completed);
+
+ pending = 0;
+ for (int n : ms.getGossipMessagePendingTasks().values())
+ pending += n;
+ completed = 0;
+ for (long n : ms.getGossipMessageCompletedTasks().values())
+ completed += n;
+ System.out.printf("%-25s%10s%10s%15s%n", "Gossip messages", "n/a", pending, completed);
}
}
}
\ No newline at end of file