You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/05/14 19:14:01 UTC

[1/2] git commit: Add server side batching to native transport

Repository: cassandra
Updated Branches:
  refs/heads/trunk 091db6482 -> 30aec9339


Add server side batching to native transport

Patch by bes; reviewed by tjake for CASSANDRA-5663


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6efd35c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6efd35c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6efd35c

Branch: refs/heads/trunk
Commit: c6efd35cf20db59c917b590100a7d09555fa4854
Parents: 8a5365e
Author: Jake Luciani <ja...@apache.org>
Authored: Wed May 14 13:10:32 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Wed May 14 13:10:32 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/transport/Message.java | 117 +++++++++++++++++--
 2 files changed, 110 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6efd35c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5fd5a8b..3dd47a1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * Optimize netty server (CASSANDRA-6861)
  * Fix repair hang when given CF does not exist (CASSANDRA-7189)
  * Allow c* to be shutdown in an embedded mode (CASSANDRA-5635)
+ * Add server side batching to native transport (CASSANDRA-5663)
 Merged from 2.0:
  * (Hadoop) Close java driver Cluster in CQLRR.close (CASSANDRA-7228)
  * Warn when 'USING TIMESTAMP' is used on a CAS BATCH (CASSANDRA-7067)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6efd35c/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index e39e02c..0ad4312 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -17,9 +17,16 @@
  */
 package org.apache.cassandra.transport;
 
+import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
@@ -302,6 +309,87 @@ public abstract class Message
     @ChannelHandler.Sharable
     public static class Dispatcher extends SimpleChannelInboundHandler<Request>
     {
+        private static class FlushItem
+        {
+            final ChannelHandlerContext ctx;
+            final Response response;
+            private FlushItem(ChannelHandlerContext ctx, Response response)
+            {
+                this.ctx = ctx;
+                this.response = response;
+            }
+        }
+
+        private final class Flusher implements Runnable
+        {
+            final EventLoop eventLoop;
+            final ConcurrentLinkedQueue<FlushItem> queued = new ConcurrentLinkedQueue<>();
+            final AtomicBoolean running = new AtomicBoolean(false);
+            final HashSet<ChannelHandlerContext> channels = new HashSet<>();
+            final List<FlushItem> flushed = new ArrayList<>();
+            int runsSinceFlush = 0;
+            int runsWithNoWork = 0;
+            private Flusher(EventLoop eventLoop)
+            {
+                this.eventLoop = eventLoop;
+            }
+            void start()
+            {
+                if (running.compareAndSet(false, true))
+                {
+                    this.eventLoop.execute(this);
+                }
+            }
+            public void run()
+            {
+
+                boolean doneWork = false;
+                FlushItem flush;
+                while ( null != (flush = queued.poll()) )
+                {
+                    channels.add(flush.ctx);
+                    flush.ctx.write(flush.response, flush.ctx.voidPromise());
+                    flushed.add(flush);
+                    doneWork = true;
+                }
+
+                runsSinceFlush++;
+
+                if (!doneWork || runsSinceFlush > 2 || flushed.size() > 50)
+                {
+                    for (ChannelHandlerContext channel : channels)
+                        channel.flush();
+                    for (FlushItem item : flushed)
+                    {
+                        if (item.response.getSourceFrame().body.refCnt() > 0)
+                            item.response.getSourceFrame().release();
+                    }
+                    channels.clear();
+                    flushed.clear();
+                    runsSinceFlush = 0;
+                }
+
+                if (doneWork)
+                {
+                    runsWithNoWork = 0;
+                }
+                else
+                {
+                    // either reschedule or cancel
+                    if (++runsWithNoWork > 5)
+                    {
+                        running.set(false);
+                        if (queued.isEmpty() || !running.compareAndSet(false, true))
+                            return;
+                    }
+                }
+
+                eventLoop.schedule(this, 10000, TimeUnit.NANOSECONDS);
+            }
+        }
+
+        private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<>();
+
         public Dispatcher()
         {
             super(false);
@@ -310,32 +398,45 @@ public abstract class Message
         @Override
         public void channelRead0(ChannelHandlerContext ctx, Request request)
         {
+
+            final Response response;
+            final ServerConnection connection;
+
             try
             {
                 assert request.connection() instanceof ServerConnection;
-                ServerConnection connection = (ServerConnection)request.connection();
+                connection = (ServerConnection)request.connection();
                 QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());
 
                 logger.debug("Received: {}, v={}", request, connection.getVersion());
 
-                Response response = request.execute(qstate);
+                response = request.execute(qstate);
                 response.setStreamId(request.getStreamId());
                 response.attach(connection);
                 response.setSourceFrame(request.getSourceFrame());
                 connection.applyStateTransition(request.type, response.type);
-
-                logger.debug("Responding: {}, v={}", response, connection.getVersion());
-
-                ctx.writeAndFlush(response, ctx.voidPromise());
             }
             catch (Throwable ex)
             {
+                request.getSourceFrame().release();
                 // Don't let the exception propagate to exceptionCaught() if we can help it so that we can assign the right streamID.
                 ctx.writeAndFlush(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()), ctx.voidPromise());
+                return;
             }
-            finally {
-                request.getSourceFrame().release();
+
+            logger.debug("Responding: {}, v={}", response, connection.getVersion());
+
+            EventLoop loop = ctx.channel().eventLoop();
+            Flusher flusher = flusherLookup.get(loop);
+            if (flusher == null)
+            {
+                Flusher alt = flusherLookup.putIfAbsent(loop, flusher = new Flusher(loop));
+                if (alt != null)
+                    flusher = alt;
             }
+
+            flusher.queued.add(new FlushItem(ctx, response));
+            flusher.start();
         }
 
         @Override


[2/2] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by ja...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/30aec933
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/30aec933
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/30aec933

Branch: refs/heads/trunk
Commit: 30aec9339620a1a2454763400586e6b9249433f3
Parents: 091db64 c6efd35
Author: Jake Luciani <ja...@apache.org>
Authored: Wed May 14 13:13:42 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Wed May 14 13:13:42 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/transport/Message.java | 117 +++++++++++++++++--
 2 files changed, 110 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/30aec933/CHANGES.txt
----------------------------------------------------------------------