You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2013/11/22 00:21:14 UTC
git commit: Batch read from OTC's queue and cleanup patch by jasorown
and belliotsmith for CASSANDRA-1632
Updated Branches:
refs/heads/trunk c550cc829 -> fdbddc132
Batch read from OTC's queue and cleanup
patch by jasorown and belliotsmith for CASSANDRA-1632
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fdbddc13
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fdbddc13
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fdbddc13
Branch: refs/heads/trunk
Commit: fdbddc13272bed6dfa3019e77acfb5f9107d6dce
Parents: c550cc8
Author: Jason Brown <ja...@gmail.com>
Authored: Thu Nov 21 15:20:28 2013 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu Nov 21 15:20:28 2013 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/net/OutboundTcpConnection.java | 95 ++++++++++----------
2 files changed, 51 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fdbddc13/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6a3de3c..06628a0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
* Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
* User-defined types for CQL3 (CASSANDRA-5590)
* Use of o.a.c.metrics in nodetool (CASSANDRA-5871)
+ * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
2.0.4
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fdbddc13/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 3bdbca3..7422f22 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -25,6 +25,9 @@ import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@@ -65,10 +68,7 @@ public class OutboundTcpConnection extends Thread
static final int LZ4_HASH_SEED = 0x9747b28c;
- // sending thread reads from "active" (one of queue1, queue2) until it is empty.
- // then it swaps it with "backlog."
- private volatile BlockingQueue<QueuedMessage> backlog = new LinkedBlockingQueue<QueuedMessage>();
- private volatile BlockingQueue<QueuedMessage> active = new LinkedBlockingQueue<QueuedMessage>();
+ private final BlockingQueue<QueuedMessage> backlog = new LinkedBlockingQueue<>();
private final OutboundTcpConnectionPool poolReference;
@@ -76,6 +76,7 @@ public class OutboundTcpConnection extends Thread
private Socket socket;
private volatile long completed;
private final AtomicLong dropped = new AtomicLong();
+ private volatile int currentMsgBufferCount = 0;
private int targetVersion;
public OutboundTcpConnection(OutboundTcpConnectionPool pool)
@@ -93,7 +94,8 @@ public class OutboundTcpConnection extends Thread
public void enqueue(MessageOut<?> message, int id)
{
- expireMessages();
+ if (backlog.size() > 1024)
+ expireMessages();
try
{
backlog.put(new QueuedMessage(message, id));
@@ -106,7 +108,6 @@ public class OutboundTcpConnection extends Thread
void closeSocket(boolean destroyThread)
{
- active.clear();
backlog.clear();
isStopped = destroyThread; // Exit loop to stop the thread
enqueue(CLOSE_SENTINEL, -1);
@@ -124,47 +125,61 @@ public class OutboundTcpConnection extends Thread
public void run()
{
+ // keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
+ final List<QueuedMessage> drainedMessages = new ArrayList<>(128);
+ outer:
while (true)
{
- QueuedMessage qm = active.poll();
- if (qm == null)
+ if (backlog.drainTo(drainedMessages, drainedMessages.size()) == 0)
{
- // exhausted the active queue. switch to backlog, once there's something to process there
try
{
- qm = backlog.take();
+ drainedMessages.add(backlog.take());
}
catch (InterruptedException e)
{
throw new AssertionError(e);
}
- BlockingQueue<QueuedMessage> tmp = backlog;
- backlog = active;
- active = tmp;
}
+ currentMsgBufferCount = drainedMessages.size();
- MessageOut<?> m = qm.message;
- if (m == CLOSE_SENTINEL)
+ int count = drainedMessages.size();
+ for (QueuedMessage qm : drainedMessages)
{
- disconnect();
- if (isStopped)
- break;
- continue;
+ try
+ {
+ MessageOut<?> m = qm.message;
+ if (m == CLOSE_SENTINEL)
+ {
+ disconnect();
+ if (isStopped)
+ break outer;
+ continue;
+ }
+ if (qm.timestamp < System.currentTimeMillis() - m.getTimeout())
+ dropped.incrementAndGet();
+ else if (socket != null || connect())
+ writeConnected(qm, count == 1 && backlog.size() == 0);
+ else
+ // clear out the queue, else gossip messages back up.
+ backlog.clear();
+ }
+ catch (Exception e)
+ {
+ // really shouldn't get here, as exception handling in writeConnected() is reasonably robust
+ // but we want to catch anything bad we don't drop the messages in the current batch
+ logger.error("error processing a message intended for {}", poolReference.endPoint(), e);
+ }
+ currentMsgBufferCount = --count;
}
- if (qm.timestamp < System.currentTimeMillis() - m.getTimeout())
- dropped.incrementAndGet();
- else if (socket != null || connect())
- writeConnected(qm);
- else
- // clear out the queue, else gossip messages back up.
- active.clear();
+ drainedMessages.clear();
}
}
public int getPendingMessages()
{
- return active.size() + backlog.size();
+ return backlog.size() + currentMsgBufferCount;
}
public long getCompletedMesssages()
@@ -184,7 +199,7 @@ public class OutboundTcpConnection extends Thread
|| (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !isLocalDC(poolReference.endPoint()));
}
- private void writeConnected(QueuedMessage qm)
+ private void writeConnected(QueuedMessage qm, boolean flush)
{
try
{
@@ -210,7 +225,7 @@ public class OutboundTcpConnection extends Thread
writeInternal(qm.message, qm.id, qm.timestamp);
completed++;
- if (active.peek() == null)
+ if (flush)
out.flush();
}
catch (Exception e)
@@ -435,23 +450,13 @@ public class OutboundTcpConnection extends Thread
private void expireMessages()
{
- while (true)
+ Iterator<QueuedMessage> iter = backlog.iterator();
+ while (iter.hasNext())
{
- QueuedMessage qm = backlog.peek();
- if (qm == null || qm.timestamp >= System.currentTimeMillis() - qm.message.getTimeout())
- break;
-
- QueuedMessage qm2 = backlog.poll();
- if (qm2 != qm)
- {
- // sending thread switched queues. add this entry (from the "new" backlog)
- // at the end of the active queue, which keeps it in the same position relative to the other entries
- // without having to contend with other clients for the head-of-backlog lock.
- if (qm2 != null)
- active.add(qm2);
- break;
- }
-
+ QueuedMessage qm = iter.next();
+ if (qm.timestamp >= System.currentTimeMillis() - qm.message.getTimeout())
+ return;
+ iter.remove();
dropped.incrementAndGet();
}
}