You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/05/14 19:14:01 UTC
[1/2] git commit: Add server side batching to native transport
Repository: cassandra
Updated Branches:
refs/heads/trunk 091db6482 -> 30aec9339
Add server side batching to native transport
Patch by bes; reviewed by tjake for CASSANDRA-5663
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6efd35c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6efd35c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6efd35c
Branch: refs/heads/trunk
Commit: c6efd35cf20db59c917b590100a7d09555fa4854
Parents: 8a5365e
Author: Jake Luciani <ja...@apache.org>
Authored: Wed May 14 13:10:32 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Wed May 14 13:10:32 2014 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/transport/Message.java | 117 +++++++++++++++++--
2 files changed, 110 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6efd35c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5fd5a8b..3dd47a1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
* Optimize netty server (CASSANDRA-6861)
* Fix repair hang when given CF does not exist (CASSANDRA-7189)
* Allow c* to be shutdown in an embedded mode (CASSANDRA-5635)
+ * Add server side batching to native transport (CASSANDRA-5663)
Merged from 2.0:
* (Hadoop) Close java driver Cluster in CQLRR.close (CASSANDRA-7228)
* Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6efd35c/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index e39e02c..0ad4312 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -17,9 +17,16 @@
*/
package org.apache.cassandra.transport;
+import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
@@ -302,6 +309,87 @@ public abstract class Message
@ChannelHandler.Sharable
public static class Dispatcher extends SimpleChannelInboundHandler<Request>
{
+ private static class FlushItem
+ {
+ final ChannelHandlerContext ctx;
+ final Response response;
+ private FlushItem(ChannelHandlerContext ctx, Response response)
+ {
+ this.ctx = ctx;
+ this.response = response;
+ }
+ }
+
+ private final class Flusher implements Runnable
+ {
+ final EventLoop eventLoop;
+ final ConcurrentLinkedQueue<FlushItem> queued = new ConcurrentLinkedQueue<>();
+ final AtomicBoolean running = new AtomicBoolean(false);
+ final HashSet<ChannelHandlerContext> channels = new HashSet<>();
+ final List<FlushItem> flushed = new ArrayList<>();
+ int runsSinceFlush = 0;
+ int runsWithNoWork = 0;
+ private Flusher(EventLoop eventLoop)
+ {
+ this.eventLoop = eventLoop;
+ }
+ void start()
+ {
+ if (running.compareAndSet(false, true))
+ {
+ this.eventLoop.execute(this);
+ }
+ }
+ public void run()
+ {
+
+ boolean doneWork = false;
+ FlushItem flush;
+ while ( null != (flush = queued.poll()) )
+ {
+ channels.add(flush.ctx);
+ flush.ctx.write(flush.response, flush.ctx.voidPromise());
+ flushed.add(flush);
+ doneWork = true;
+ }
+
+ runsSinceFlush++;
+
+ if (!doneWork || runsSinceFlush > 2 || flushed.size() > 50)
+ {
+ for (ChannelHandlerContext channel : channels)
+ channel.flush();
+ for (FlushItem item : flushed)
+ {
+ if (item.response.getSourceFrame().body.refCnt() > 0)
+ item.response.getSourceFrame().release();
+ }
+ channels.clear();
+ flushed.clear();
+ runsSinceFlush = 0;
+ }
+
+ if (doneWork)
+ {
+ runsWithNoWork = 0;
+ }
+ else
+ {
+ // either reschedule or cancel
+ if (++runsWithNoWork > 5)
+ {
+ running.set(false);
+ if (queued.isEmpty() || !running.compareAndSet(false, true))
+ return;
+ }
+ }
+
+ eventLoop.schedule(this, 10000, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<>();
+
public Dispatcher()
{
super(false);
@@ -310,32 +398,45 @@ public abstract class Message
@Override
public void channelRead0(ChannelHandlerContext ctx, Request request)
{
+
+ final Response response;
+ final ServerConnection connection;
+
try
{
assert request.connection() instanceof ServerConnection;
- ServerConnection connection = (ServerConnection)request.connection();
+ connection = (ServerConnection)request.connection();
QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());
logger.debug("Received: {}, v={}", request, connection.getVersion());
- Response response = request.execute(qstate);
+ response = request.execute(qstate);
response.setStreamId(request.getStreamId());
response.attach(connection);
response.setSourceFrame(request.getSourceFrame());
connection.applyStateTransition(request.type, response.type);
-
- logger.debug("Responding: {}, v={}", response, connection.getVersion());
-
- ctx.writeAndFlush(response, ctx.voidPromise());
}
catch (Throwable ex)
{
+ request.getSourceFrame().release();
// Don't let the exception propagate to exceptionCaught() if we can help it so that we can assign the right streamID.
ctx.writeAndFlush(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()), ctx.voidPromise());
+ return;
}
- finally {
- request.getSourceFrame().release();
+
+ logger.debug("Responding: {}, v={}", response, connection.getVersion());
+
+ EventLoop loop = ctx.channel().eventLoop();
+ Flusher flusher = flusherLookup.get(loop);
+ if (flusher == null)
+ {
+ Flusher alt = flusherLookup.putIfAbsent(loop, flusher = new Flusher(loop));
+ if (alt != null)
+ flusher = alt;
}
+
+ flusher.queued.add(new FlushItem(ctx, response));
+ flusher.start();
}
@Override
[2/2] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ja...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/30aec933
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/30aec933
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/30aec933
Branch: refs/heads/trunk
Commit: 30aec9339620a1a2454763400586e6b9249433f3
Parents: 091db64 c6efd35
Author: Jake Luciani <ja...@apache.org>
Authored: Wed May 14 13:13:42 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Wed May 14 13:13:42 2014 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/transport/Message.java | 117 +++++++++++++++++--
2 files changed, 110 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/30aec933/CHANGES.txt
----------------------------------------------------------------------