You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/11/08 00:32:05 UTC
svn commit: r1199008 - in
/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net:
MessagingService.java MessagingServiceMBean.java OutboundTcpConnection.java
Author: jbellis
Date: Mon Nov 7 23:32:04 2011
New Revision: 1199008
URL: http://svn.apache.org/viewvc?rev=1199008&view=rev
Log:
#3005
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1199008&r1=1199007&r2=1199008&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java Mon Nov 7 23:32:04 2011
@@ -674,6 +674,14 @@ public final class MessagingService impl
return completedTasks;
}
+ public Map<String, Long> getCommandDroppedTasks()
+ {
+ Map<String, Long> droppedTasks = new HashMap<String, Long>();
+ for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers_.entrySet())
+ droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().cmdCon.getDroppedMessages());
+ return droppedTasks;
+ }
+
public Map<String, Integer> getResponsePendingTasks()
{
Map<String, Integer> pendingTasks = new HashMap<String, Integer>();
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java?rev=1199008&r1=1199007&r2=1199008&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java Mon Nov 7 23:32:04 2011
@@ -41,6 +41,11 @@ public interface MessagingServiceMBean
public Map<String, Long> getCommandCompletedTasks();
/**
+ * Dropped tasks for Command(Mutations, Read etc) TCP Connections
+ */
+ public Map<String, Long> getCommandDroppedTasks();
+
+ /**
* Pending tasks for Response(GOSSIP & RESPONSE) TCP Connections
*/
public Map<String, Integer> getResponsePendingTasks();
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1199008&r1=1199007&r2=1199008&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Mon Nov 7 23:32:04 2011
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
@@ -35,7 +36,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
public class OutboundTcpConnection extends Thread
{
@@ -47,12 +47,18 @@ public class OutboundTcpConnection exten
MessagingService.version_);
private static final int OPEN_RETRY_DELAY = 100; // ms between retries
- private final BlockingQueue<Pair<Message, String>> queue = new LinkedBlockingQueue<Pair<Message, String>>();
+
+ // sending thread reads from "active" (one of queue1, queue2) until it is empty.
+ // then it swaps it with "backlog."
+ private volatile BlockingQueue<Entry> backlog = new LinkedBlockingQueue<Entry>();
+ private volatile BlockingQueue<Entry> active = new LinkedBlockingQueue<Entry>();
+
private final OutboundTcpConnectionPool poolReference;
private DataOutputStream out;
private Socket socket;
- private long completedCount;
+ private volatile long completed;
+ private final AtomicLong dropped = new AtomicLong();
public OutboundTcpConnection(OutboundTcpConnectionPool pool)
{
@@ -62,9 +68,10 @@ public class OutboundTcpConnection exten
public void enqueue(Message message, String id)
{
+ expireMessages();
try
{
- queue.put(Pair.create(message, id));
+ backlog.put(new Entry(message, id, System.currentTimeMillis()));
}
catch (InterruptedException e)
{
@@ -74,7 +81,8 @@ public class OutboundTcpConnection exten
void closeSocket()
{
- queue.clear();
+ active.clear();
+ backlog.clear();
enqueue(CLOSE_SENTINEL, null);
}
@@ -82,30 +90,54 @@ public class OutboundTcpConnection exten
{
while (true)
{
- Pair<Message, String> pair = take();
- Message m = pair.left;
- String id = pair.right;
+ Entry entry = active.poll();
+ if (entry == null)
+ {
+ // exhausted the active queue. switch to backlog, once there's something to process there
+ try
+ {
+ entry = backlog.take();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ BlockingQueue<Entry> tmp = backlog;
+ backlog = active;
+ active = tmp;
+ }
+
+ Message m = entry.message;
+ String id = entry.id;
if (m == CLOSE_SENTINEL)
{
disconnect();
continue;
}
- if (socket != null || connect())
+ if (entry.timestamp < System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
+ dropped.incrementAndGet();
+ else if (socket != null || connect())
writeConnected(m, id);
else
// clear out the queue, else gossip messages back up.
- queue.clear();
+ active.clear();
}
}
public int getPendingMessages()
{
- return queue.size();
+ return active.size() + backlog.size();
}
public long getCompletedMesssages()
{
- return completedCount;
+ return completed;
+ }
+
+ public long getDroppedMessages()
+ {
+ return dropped.get();
}
private void writeConnected(Message message, String id)
@@ -113,7 +145,8 @@ public class OutboundTcpConnection exten
try
{
write(message, id, out);
- if (queue.peek() == null)
+ completed++;
+ if (active.peek() == null)
{
out.flush();
}
@@ -182,21 +215,6 @@ public class OutboundTcpConnection exten
}
}
- private Pair<Message, String> take()
- {
- Pair<Message, String> pair;
- try
- {
- pair = queue.take();
- completedCount++;
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- return pair;
- }
-
private boolean connect()
{
if (logger.isDebugEnabled())
@@ -229,4 +247,41 @@ public class OutboundTcpConnection exten
}
return false;
}
+
+ private void expireMessages()
+ {
+ while (true)
+ {
+ Entry entry = backlog.peek();
+ if (entry == null || entry.timestamp >= System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
+ break;
+
+ Entry entry2 = backlog.poll();
+ if (entry2 != entry)
+ {
+ // sending thread switched queues. add this entry (from the "new" backlog)
+ // at the end of the active queue, which keeps it in the same position relative to the other entries
+ // without having to contend with other clients for the head-of-backlog lock.
+ if (entry2 != null)
+ active.add(entry2);
+ break;
+ }
+
+ dropped.incrementAndGet();
+ }
+ }
+
+ private static class Entry
+ {
+ final Message message;
+ final String id;
+ final long timestamp;
+
+ Entry(Message message, String id, long timestamp)
+ {
+ this.message = message;
+ this.id = id;
+ this.timestamp = timestamp;
+ }
+ }
}