You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2013/11/22 00:21:14 UTC

git commit: Batch read from OTC's queue and cleanup patch by jasorown and belliotsmith for CASSANDRA-1632

Updated Branches:
  refs/heads/trunk c550cc829 -> fdbddc132


Batch read from OTC's queue and cleanup
patch by jasorown and belliotsmith for CASSANDRA-1632


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fdbddc13
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fdbddc13
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fdbddc13

Branch: refs/heads/trunk
Commit: fdbddc13272bed6dfa3019e77acfb5f9107d6dce
Parents: c550cc8
Author: Jason Brown <ja...@gmail.com>
Authored: Thu Nov 21 15:20:28 2013 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu Nov 21 15:20:28 2013 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/net/OutboundTcpConnection.java    | 95 ++++++++++----------
 2 files changed, 51 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fdbddc13/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6a3de3c..06628a0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
  * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
  * User-defined types for CQL3 (CASSANDRA-5590)
  * Use of o.a.c.metrics in nodetool (CASSANDRA-5871)
+ * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
 
 
 2.0.4

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fdbddc13/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 3bdbca3..7422f22 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -25,6 +25,9 @@ import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
@@ -65,10 +68,7 @@ public class OutboundTcpConnection extends Thread
 
     static final int LZ4_HASH_SEED = 0x9747b28c;
 
-    // sending thread reads from "active" (one of queue1, queue2) until it is empty.
-    // then it swaps it with "backlog."
-    private volatile BlockingQueue<QueuedMessage> backlog = new LinkedBlockingQueue<QueuedMessage>();
-    private volatile BlockingQueue<QueuedMessage> active = new LinkedBlockingQueue<QueuedMessage>();
+    private final BlockingQueue<QueuedMessage> backlog = new LinkedBlockingQueue<>();
 
     private final OutboundTcpConnectionPool poolReference;
 
@@ -76,6 +76,7 @@ public class OutboundTcpConnection extends Thread
     private Socket socket;
     private volatile long completed;
     private final AtomicLong dropped = new AtomicLong();
+    private volatile int currentMsgBufferCount = 0;
     private int targetVersion;
 
     public OutboundTcpConnection(OutboundTcpConnectionPool pool)
@@ -93,7 +94,8 @@ public class OutboundTcpConnection extends Thread
 
     public void enqueue(MessageOut<?> message, int id)
     {
-        expireMessages();
+        if (backlog.size() > 1024)
+            expireMessages();
         try
         {
             backlog.put(new QueuedMessage(message, id));
@@ -106,7 +108,6 @@ public class OutboundTcpConnection extends Thread
 
     void closeSocket(boolean destroyThread)
     {
-        active.clear();
         backlog.clear();
         isStopped = destroyThread; // Exit loop to stop the thread
         enqueue(CLOSE_SENTINEL, -1);
@@ -124,47 +125,61 @@ public class OutboundTcpConnection extends Thread
 
     public void run()
     {
+        // keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
+        final List<QueuedMessage> drainedMessages = new ArrayList<>(128);
+        outer:
         while (true)
         {
-            QueuedMessage qm = active.poll();
-            if (qm == null)
+            if (backlog.drainTo(drainedMessages, drainedMessages.size()) == 0)
             {
-                // exhausted the active queue.  switch to backlog, once there's something to process there
                 try
                 {
-                    qm = backlog.take();
+                    drainedMessages.add(backlog.take());
                 }
                 catch (InterruptedException e)
                 {
                     throw new AssertionError(e);
                 }
 
-                BlockingQueue<QueuedMessage> tmp = backlog;
-                backlog = active;
-                active = tmp;
             }
+            currentMsgBufferCount = drainedMessages.size();
 
-            MessageOut<?> m = qm.message;
-            if (m == CLOSE_SENTINEL)
+            int count = drainedMessages.size();
+            for (QueuedMessage qm : drainedMessages)
             {
-                disconnect();
-                if (isStopped)
-                    break;
-                continue;
+                try
+                {
+                    MessageOut<?> m = qm.message;
+                    if (m == CLOSE_SENTINEL)
+                    {
+                        disconnect();
+                        if (isStopped)
+                            break outer;
+                        continue;
+                    }
+                    if (qm.timestamp < System.currentTimeMillis() - m.getTimeout())
+                        dropped.incrementAndGet();
+                    else if (socket != null || connect())
+                        writeConnected(qm, count == 1 && backlog.size() == 0);
+                    else
+                        // clear out the queue, else gossip messages back up.
+                        backlog.clear();
+                }
+                catch (Exception e)
+                {
+                    // really shouldn't get here, as exception handling in writeConnected() is reasonably robust
+                    // but we want to catch anything bad we don't drop the messages in the current batch
+                    logger.error("error processing a message intended for {}", poolReference.endPoint(), e);
+                }
+                currentMsgBufferCount = --count;
             }
-            if (qm.timestamp < System.currentTimeMillis() - m.getTimeout())
-                dropped.incrementAndGet();
-            else if (socket != null || connect())
-                writeConnected(qm);
-            else
-                // clear out the queue, else gossip messages back up.
-                active.clear();
+            drainedMessages.clear();
         }
     }
 
     public int getPendingMessages()
     {
-        return active.size() + backlog.size();
+        return backlog.size() + currentMsgBufferCount;
     }
 
     public long getCompletedMesssages()
@@ -184,7 +199,7 @@ public class OutboundTcpConnection extends Thread
                || (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !isLocalDC(poolReference.endPoint()));
     }
 
-    private void writeConnected(QueuedMessage qm)
+    private void writeConnected(QueuedMessage qm, boolean flush)
     {
         try
         {
@@ -210,7 +225,7 @@ public class OutboundTcpConnection extends Thread
             writeInternal(qm.message, qm.id, qm.timestamp);
 
             completed++;
-            if (active.peek() == null)
+            if (flush)
                 out.flush();
         }
         catch (Exception e)
@@ -435,23 +450,13 @@ public class OutboundTcpConnection extends Thread
 
     private void expireMessages()
     {
-        while (true)
+        Iterator<QueuedMessage> iter = backlog.iterator();
+        while (iter.hasNext())
         {
-            QueuedMessage qm = backlog.peek();
-            if (qm == null || qm.timestamp >= System.currentTimeMillis() - qm.message.getTimeout())
-                break;
-
-            QueuedMessage qm2 = backlog.poll();
-            if (qm2 != qm)
-            {
-                // sending thread switched queues.  add this entry (from the "new" backlog)
-                // at the end of the active queue, which keeps it in the same position relative to the other entries
-                // without having to contend with other clients for the head-of-backlog lock.
-                if (qm2 != null)
-                    active.add(qm2);
-                break;
-            }
-
+            QueuedMessage qm = iter.next();
+            if (qm.timestamp >= System.currentTimeMillis() - qm.message.getTimeout())
+                return;
+            iter.remove();
             dropped.incrementAndGet();
         }
     }