You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/11/08 00:33:25 UTC
svn commit: r1199009 - in
/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net:
MessagingService.java MessagingServiceMBean.java OutboundTcpConnection.java
Author: jbellis
Date: Mon Nov 7 23:33:25 2011
New Revision: 1199009
URL: http://svn.apache.org/viewvc?rev=1199009&view=rev
Log:
revert #3005 (will keep 1.1-only)
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1199009&r1=1199008&r2=1199009&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java Mon Nov 7 23:33:25 2011
@@ -674,14 +674,6 @@ public final class MessagingService impl
return completedTasks;
}
- public Map<String, Long> getCommandDroppedTasks()
- {
- Map<String, Long> droppedTasks = new HashMap<String, Long>();
- for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers_.entrySet())
- droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().cmdCon.getDroppedMessages());
- return droppedTasks;
- }
-
public Map<String, Integer> getResponsePendingTasks()
{
Map<String, Integer> pendingTasks = new HashMap<String, Integer>();
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java?rev=1199009&r1=1199008&r2=1199009&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java Mon Nov 7 23:33:25 2011
@@ -41,11 +41,6 @@ public interface MessagingServiceMBean
public Map<String, Long> getCommandCompletedTasks();
/**
- * Dropped tasks for Command(Mutations, Read etc) TCP Connections
- */
- public Map<String, Long> getCommandDroppedTasks();
-
- /**
* Pending tasks for Response(GOSSIP & RESPONSE) TCP Connections
*/
public Map<String, Integer> getResponsePendingTasks();
Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1199009&r1=1199008&r2=1199009&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Mon Nov 7 23:33:25 2011
@@ -27,7 +27,6 @@ import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
@@ -36,6 +35,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
public class OutboundTcpConnection extends Thread
{
@@ -47,18 +47,12 @@ public class OutboundTcpConnection exten
MessagingService.version_);
private static final int OPEN_RETRY_DELAY = 100; // ms between retries
-
- // sending thread reads from "active" (one of queue1, queue2) until it is empty.
- // then it swaps it with "backlog."
- private volatile BlockingQueue<Entry> backlog = new LinkedBlockingQueue<Entry>();
- private volatile BlockingQueue<Entry> active = new LinkedBlockingQueue<Entry>();
-
+ private final BlockingQueue<Pair<Message, String>> queue = new LinkedBlockingQueue<Pair<Message, String>>();
private final OutboundTcpConnectionPool poolReference;
private DataOutputStream out;
private Socket socket;
- private volatile long completed;
- private final AtomicLong dropped = new AtomicLong();
+ private long completedCount;
public OutboundTcpConnection(OutboundTcpConnectionPool pool)
{
@@ -68,10 +62,9 @@ public class OutboundTcpConnection exten
public void enqueue(Message message, String id)
{
- expireMessages();
try
{
- backlog.put(new Entry(message, id, System.currentTimeMillis()));
+ queue.put(Pair.create(message, id));
}
catch (InterruptedException e)
{
@@ -81,8 +74,7 @@ public class OutboundTcpConnection exten
void closeSocket()
{
- active.clear();
- backlog.clear();
+ queue.clear();
enqueue(CLOSE_SENTINEL, null);
}
@@ -90,54 +82,30 @@ public class OutboundTcpConnection exten
{
while (true)
{
- Entry entry = active.poll();
- if (entry == null)
- {
- // exhausted the active queue. switch to backlog, once there's something to process there
- try
- {
- entry = backlog.take();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
-
- BlockingQueue<Entry> tmp = backlog;
- backlog = active;
- active = tmp;
- }
-
- Message m = entry.message;
- String id = entry.id;
+ Pair<Message, String> pair = take();
+ Message m = pair.left;
+ String id = pair.right;
if (m == CLOSE_SENTINEL)
{
disconnect();
continue;
}
- if (entry.timestamp < System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
- dropped.incrementAndGet();
- else if (socket != null || connect())
+ if (socket != null || connect())
writeConnected(m, id);
else
// clear out the queue, else gossip messages back up.
- active.clear();
+ queue.clear();
}
}
public int getPendingMessages()
{
- return active.size() + backlog.size();
+ return queue.size();
}
public long getCompletedMesssages()
{
- return completed;
- }
-
- public long getDroppedMessages()
- {
- return dropped.get();
+ return completedCount;
}
private void writeConnected(Message message, String id)
@@ -145,8 +113,7 @@ public class OutboundTcpConnection exten
try
{
write(message, id, out);
- completed++;
- if (active.peek() == null)
+ if (queue.peek() == null)
{
out.flush();
}
@@ -215,6 +182,21 @@ public class OutboundTcpConnection exten
}
}
+ private Pair<Message, String> take()
+ {
+ Pair<Message, String> pair;
+ try
+ {
+ pair = queue.take();
+ completedCount++;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ return pair;
+ }
+
private boolean connect()
{
if (logger.isDebugEnabled())
@@ -247,41 +229,4 @@ public class OutboundTcpConnection exten
}
return false;
}
-
- private void expireMessages()
- {
- while (true)
- {
- Entry entry = backlog.peek();
- if (entry == null || entry.timestamp >= System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
- break;
-
- Entry entry2 = backlog.poll();
- if (entry2 != entry)
- {
- // sending thread switched queues. add this entry (from the "new" backlog)
- // at the end of the active queue, which keeps it in the same position relative to the other entries
- // without having to contend with other clients for the head-of-backlog lock.
- if (entry2 != null)
- active.add(entry2);
- break;
- }
-
- dropped.incrementAndGet();
- }
- }
-
- private static class Entry
- {
- final Message message;
- final String id;
- final long timestamp;
-
- Entry(Message message, String id, long timestamp)
- {
- this.message = message;
- this.id = id;
- this.timestamp = timestamp;
- }
- }
}