You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/11/08 00:32:05 UTC

svn commit: r1199008 - in /cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net: MessagingService.java MessagingServiceMBean.java OutboundTcpConnection.java

Author: jbellis
Date: Mon Nov  7 23:32:04 2011
New Revision: 1199008

URL: http://svn.apache.org/viewvc?rev=1199008&view=rev
Log:
#3005

Modified:
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1199008&r1=1199007&r2=1199008&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java Mon Nov  7 23:32:04 2011
@@ -674,6 +674,14 @@ public final class MessagingService impl
         return completedTasks;
     }
 
+    public Map<String, Long> getCommandDroppedTasks()
+    {
+        Map<String, Long> droppedTasks = new HashMap<String, Long>();
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers_.entrySet())
+            droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().cmdCon.getDroppedMessages());
+        return droppedTasks;
+    }
+
     public Map<String, Integer> getResponsePendingTasks()
     {
         Map<String, Integer> pendingTasks = new HashMap<String, Integer>();

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java?rev=1199008&r1=1199007&r2=1199008&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java Mon Nov  7 23:32:04 2011
@@ -41,6 +41,11 @@ public interface MessagingServiceMBean
     public Map<String, Long> getCommandCompletedTasks();
 
     /**
+     * Dropped tasks for Command(Mutations, Read etc) TCP Connections
+     */
+    public Map<String, Long> getCommandDroppedTasks();
+
+    /**
      * Pending tasks for Response(GOSSIP & RESPONSE) TCP Connections
      */
     public Map<String, Integer> getResponsePendingTasks();

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1199008&r1=1199007&r2=1199008&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Mon Nov  7 23:32:04 2011
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.net.Socket;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
@@ -35,7 +36,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 public class OutboundTcpConnection extends Thread
 {
@@ -47,12 +47,18 @@ public class OutboundTcpConnection exten
                                                               MessagingService.version_);
 
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
-    private final BlockingQueue<Pair<Message, String>> queue = new LinkedBlockingQueue<Pair<Message, String>>();
+
+    // sending thread reads from "active" (one of queue1, queue2) until it is empty.
+    // then it swaps it with "backlog."
+    private volatile BlockingQueue<Entry> backlog = new LinkedBlockingQueue<Entry>();
+    private volatile BlockingQueue<Entry> active = new LinkedBlockingQueue<Entry>();
+
     private final OutboundTcpConnectionPool poolReference;    
 
     private DataOutputStream out;
     private Socket socket;
-    private long completedCount;
+    private volatile long completed;
+    private final AtomicLong dropped = new AtomicLong();
 
     public OutboundTcpConnection(OutboundTcpConnectionPool pool)
     {
@@ -62,9 +68,10 @@ public class OutboundTcpConnection exten
 
     public void enqueue(Message message, String id)
     {
+        expireMessages();
         try
         {
-            queue.put(Pair.create(message, id));
+            backlog.put(new Entry(message, id, System.currentTimeMillis()));
         }
         catch (InterruptedException e)
         {
@@ -74,7 +81,8 @@ public class OutboundTcpConnection exten
 
     void closeSocket()
     {
-        queue.clear();
+        active.clear();
+        backlog.clear();
         enqueue(CLOSE_SENTINEL, null);
     }
 
@@ -82,30 +90,54 @@ public class OutboundTcpConnection exten
     {
         while (true)
         {
-            Pair<Message, String> pair = take();
-            Message m = pair.left;
-            String id = pair.right;
+            Entry entry = active.poll();
+            if (entry == null)
+            {
+                // exhausted the active queue.  switch to backlog, once there's something to process there
+                try
+                {
+                    entry = backlog.take();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+
+                BlockingQueue<Entry> tmp = backlog;
+                backlog = active;
+                active = tmp;
+            }
+
+            Message m = entry.message;
+            String id = entry.id;
             if (m == CLOSE_SENTINEL)
             {
                 disconnect();
                 continue;
             }
-            if (socket != null || connect())
+            if (entry.timestamp < System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
+                dropped.incrementAndGet();
+            else if (socket != null || connect())
                 writeConnected(m, id);
             else
                 // clear out the queue, else gossip messages back up.
-                queue.clear();            
+                active.clear();
         }
     }
 
     public int getPendingMessages()
     {
-        return queue.size();
+        return active.size() + backlog.size();
     }
 
     public long getCompletedMesssages()
     {
-        return completedCount;
+        return completed;
+    }
+
+    public long getDroppedMessages()
+    {
+        return dropped.get();
     }
 
     private void writeConnected(Message message, String id)
@@ -113,7 +145,8 @@ public class OutboundTcpConnection exten
         try
         {
             write(message, id, out);
-            if (queue.peek() == null)
+            completed++;
+            if (active.peek() == null)
             {
                 out.flush();
             }
@@ -182,21 +215,6 @@ public class OutboundTcpConnection exten
         }
     }
 
-    private Pair<Message, String> take()
-    {
-        Pair<Message, String> pair;
-        try
-        {
-            pair = queue.take();
-            completedCount++;
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        return pair;
-    }
-
     private boolean connect()
     {
         if (logger.isDebugEnabled())
@@ -229,4 +247,41 @@ public class OutboundTcpConnection exten
         }
         return false;
     }
+
+    private void expireMessages()
+    {
+        while (true)
+        {
+            Entry entry = backlog.peek();
+            if (entry == null || entry.timestamp >= System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
+                break;
+
+            Entry entry2 = backlog.poll();
+            if (entry2 != entry)
+            {
+                // sending thread switched queues.  add this entry (from the "new" backlog)
+                // at the end of the active queue, which keeps it in the same position relative to the other entries
+                // without having to contend with other clients for the head-of-backlog lock.
+                if (entry2 != null)
+                    active.add(entry2);
+                break;
+            }
+
+            dropped.incrementAndGet();
+        }
+    }
+
+    private static class Entry
+    {
+        final Message message;
+        final String id;
+        final long timestamp;
+
+        Entry(Message message, String id, long timestamp)
+        {
+            this.message = message;
+            this.id = id;
+            this.timestamp = timestamp;
+        }
+    }
 }