You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/11/08 00:33:25 UTC

svn commit: r1199009 - in /cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net: MessagingService.java MessagingServiceMBean.java OutboundTcpConnection.java

Author: jbellis
Date: Mon Nov  7 23:33:25 2011
New Revision: 1199009

URL: http://svn.apache.org/viewvc?rev=1199009&view=rev
Log:
revert #3005 (will keep 1.1-only)

Modified:
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java?rev=1199009&r1=1199008&r2=1199009&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingService.java Mon Nov  7 23:33:25 2011
@@ -674,14 +674,6 @@ public final class MessagingService impl
         return completedTasks;
     }
 
-    public Map<String, Long> getCommandDroppedTasks()
-    {
-        Map<String, Long> droppedTasks = new HashMap<String, Long>();
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers_.entrySet())
-            droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().cmdCon.getDroppedMessages());
-        return droppedTasks;
-    }
-
     public Map<String, Integer> getResponsePendingTasks()
     {
         Map<String, Integer> pendingTasks = new HashMap<String, Integer>();

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java?rev=1199009&r1=1199008&r2=1199009&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/MessagingServiceMBean.java Mon Nov  7 23:33:25 2011
@@ -41,11 +41,6 @@ public interface MessagingServiceMBean
     public Map<String, Long> getCommandCompletedTasks();
 
     /**
-     * Dropped tasks for Command(Mutations, Read etc) TCP Connections
-     */
-    public Map<String, Long> getCommandDroppedTasks();
-
-    /**
      * Pending tasks for Response(GOSSIP & RESPONSE) TCP Connections
      */
     public Map<String, Integer> getResponsePendingTasks();

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1199009&r1=1199008&r2=1199009&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java Mon Nov  7 23:33:25 2011
@@ -27,7 +27,6 @@ import java.io.IOException;
 import java.net.Socket;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
@@ -36,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 public class OutboundTcpConnection extends Thread
 {
@@ -47,18 +47,12 @@ public class OutboundTcpConnection exten
                                                               MessagingService.version_);
 
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
-
-    // sending thread reads from "active" (one of queue1, queue2) until it is empty.
-    // then it swaps it with "backlog."
-    private volatile BlockingQueue<Entry> backlog = new LinkedBlockingQueue<Entry>();
-    private volatile BlockingQueue<Entry> active = new LinkedBlockingQueue<Entry>();
-
+    private final BlockingQueue<Pair<Message, String>> queue = new LinkedBlockingQueue<Pair<Message, String>>();
     private final OutboundTcpConnectionPool poolReference;    
 
     private DataOutputStream out;
     private Socket socket;
-    private volatile long completed;
-    private final AtomicLong dropped = new AtomicLong();
+    private long completedCount;
 
     public OutboundTcpConnection(OutboundTcpConnectionPool pool)
     {
@@ -68,10 +62,9 @@ public class OutboundTcpConnection exten
 
     public void enqueue(Message message, String id)
     {
-        expireMessages();
         try
         {
-            backlog.put(new Entry(message, id, System.currentTimeMillis()));
+            queue.put(Pair.create(message, id));
         }
         catch (InterruptedException e)
         {
@@ -81,8 +74,7 @@ public class OutboundTcpConnection exten
 
     void closeSocket()
     {
-        active.clear();
-        backlog.clear();
+        queue.clear();
         enqueue(CLOSE_SENTINEL, null);
     }
 
@@ -90,54 +82,30 @@ public class OutboundTcpConnection exten
     {
         while (true)
         {
-            Entry entry = active.poll();
-            if (entry == null)
-            {
-                // exhausted the active queue.  switch to backlog, once there's something to process there
-                try
-                {
-                    entry = backlog.take();
-                }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError(e);
-                }
-
-                BlockingQueue<Entry> tmp = backlog;
-                backlog = active;
-                active = tmp;
-            }
-
-            Message m = entry.message;
-            String id = entry.id;
+            Pair<Message, String> pair = take();
+            Message m = pair.left;
+            String id = pair.right;
             if (m == CLOSE_SENTINEL)
             {
                 disconnect();
                 continue;
             }
-            if (entry.timestamp < System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
-                dropped.incrementAndGet();
-            else if (socket != null || connect())
+            if (socket != null || connect())
                 writeConnected(m, id);
             else
                 // clear out the queue, else gossip messages back up.
-                active.clear();
+                queue.clear();            
         }
     }
 
     public int getPendingMessages()
     {
-        return active.size() + backlog.size();
+        return queue.size();
     }
 
     public long getCompletedMesssages()
     {
-        return completed;
-    }
-
-    public long getDroppedMessages()
-    {
-        return dropped.get();
+        return completedCount;
     }
 
     private void writeConnected(Message message, String id)
@@ -145,8 +113,7 @@ public class OutboundTcpConnection exten
         try
         {
             write(message, id, out);
-            completed++;
-            if (active.peek() == null)
+            if (queue.peek() == null)
             {
                 out.flush();
             }
@@ -215,6 +182,21 @@ public class OutboundTcpConnection exten
         }
     }
 
+    private Pair<Message, String> take()
+    {
+        Pair<Message, String> pair;
+        try
+        {
+            pair = queue.take();
+            completedCount++;
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        return pair;
+    }
+
     private boolean connect()
     {
         if (logger.isDebugEnabled())
@@ -247,41 +229,4 @@ public class OutboundTcpConnection exten
         }
         return false;
     }
-
-    private void expireMessages()
-    {
-        while (true)
-        {
-            Entry entry = backlog.peek();
-            if (entry == null || entry.timestamp >= System.currentTimeMillis() - DatabaseDescriptor.getRpcTimeout())
-                break;
-
-            Entry entry2 = backlog.poll();
-            if (entry2 != entry)
-            {
-                // sending thread switched queues.  add this entry (from the "new" backlog)
-                // at the end of the active queue, which keeps it in the same position relative to the other entries
-                // without having to contend with other clients for the head-of-backlog lock.
-                if (entry2 != null)
-                    active.add(entry2);
-                break;
-            }
-
-            dropped.incrementAndGet();
-        }
-    }
-
-    private static class Entry
-    {
-        final Message message;
-        final String id;
-        final long timestamp;
-
-        Entry(Message message, String id, long timestamp)
-        {
-            this.message = message;
-            this.id = id;
-            this.timestamp = timestamp;
-        }
-    }
 }