You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/05/07 04:04:01 UTC

git commit: Optimize netty server

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 903069318 -> bc4b008bf


Optimize netty server

Patch by tjake; reviewed by Benedict Elliott Smith for CASSANDRA-6861


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc4b008b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc4b008b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc4b008b

Branch: refs/heads/cassandra-2.1
Commit: bc4b008bf138f3542f228624b9e9a4a4301ea8b2
Parents: 9030693
Author: Jake Luciani <ja...@apache.org>
Authored: Tue May 6 21:22:03 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Tue May 6 21:22:03 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +-
 .../org/apache/cassandra/transport/CBUtil.java  |  9 +-
 .../org/apache/cassandra/transport/Frame.java   | 10 ++-
 .../cassandra/transport/FrameCompressor.java    | 94 +++++++++++++++-----
 .../org/apache/cassandra/transport/Message.java | 52 ++++++++---
 .../org/apache/cassandra/transport/Server.java  | 17 ++--
 6 files changed, 142 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 88ff5d2..6564aa6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,7 +3,8 @@
  * Fix bugs in supercolumns handling (CASSANDRA-7138)
  * Fix ClassClassException on composite dense tables (CASSANDRA-7112)
  * Cleanup and optimize collation and slice iterators (CASSANDRA-7107)
- * Upgrade NBHM lib (CASSANDRA-7128) 
+ * Upgrade NBHM lib (CASSANDRA-7128)
+ * Optimize netty server (CASSANDRA-6861)
 Merged from 2.0:
  * Correctly delete scheduled range xfers (CASSANDRA-7143)
  * Make batchlog replica selection rack-aware (CASSANDRA-6551)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 36a7e71..e6ba029 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -30,12 +30,15 @@ import java.util.Map;
 import java.util.UUID;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.util.AttributeKey;
 import io.netty.util.CharsetUtil;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -48,6 +51,9 @@ import org.apache.cassandra.utils.UUIDGen;
  */
 public abstract class CBUtil
 {
+    public static final ByteBufAllocator allocator = new PooledByteBufAllocator(true);
+    public static final ByteBufAllocator onHeapAllocator = new PooledByteBufAllocator(false);
+
     private CBUtil() {}
 
     private static String readString(ByteBuf cb, int length)
@@ -300,7 +306,8 @@ public abstract class CBUtil
         if (slice.nioBufferCount() > 0)
             return slice.nioBuffer();
         else
-            return Unpooled.copiedBuffer(slice).nioBuffer();
+            return ByteBuffer.wrap(readRawBytes(cb));
+
     }
 
     public static void writeValue(byte[] bytes, ByteBuf cb)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 70fe150..bec3c96 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -55,6 +55,11 @@ public class Frame
         this.body = body;
     }
 
+    public void release()
+    {
+        body.release();
+    }
+
     public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ByteBuf body)
     {
         Header header = new Header(version, flags, streamId, type);
@@ -194,8 +199,7 @@ public class Frame
                 return;
 
             // extract body
-            // TODO: do we need unpooled?
-            ByteBuf body = Unpooled.copiedBuffer(buffer.duplicate().slice(idx + Header.LENGTH, (int) bodyLength));
+            ByteBuf body = CBUtil.allocator.buffer((int) bodyLength).writeBytes(buffer.duplicate().slice(idx + Header.LENGTH, (int) bodyLength));
             buffer.readerIndex(idx + frameLengthInt);
 
             Connection connection = ctx.channel().attr(Connection.attributeKey).get();
@@ -238,7 +242,7 @@ public class Frame
         public void encode(ChannelHandlerContext ctx, Frame frame, List results)
         throws IOException
         {
-            ByteBuf header = Unpooled.buffer(Frame.Header.LENGTH);
+            ByteBuf header = CBUtil.allocator.buffer(Frame.Header.LENGTH);
             Message.Type type = frame.header.type;
             header.writeByte(type.direction.addToVersion(frame.header.version));
             header.writeByte(Header.Flag.serialize(frame.header.flags));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/src/java/org/apache/cassandra/transport/FrameCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java
index 3e8c555..8312b90 100644
--- a/src/java/org/apache/cassandra/transport/FrameCompressor.java
+++ b/src/java/org/apache/cassandra/transport/FrameCompressor.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.transport;
 
 import java.io.IOException;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.xerial.snappy.Snappy;
 import org.xerial.snappy.SnappyError;
@@ -74,10 +75,25 @@ public interface FrameCompressor
         public Frame compress(Frame frame) throws IOException
         {
             byte[] input = CBUtil.readRawBytes(frame.body);
-            byte[] output = new byte[Snappy.maxCompressedLength(input.length)];
+            ByteBuf output = CBUtil.onHeapAllocator.buffer(Snappy.maxCompressedLength(input.length));
 
-            int written = Snappy.compress(input, 0, input.length, output, 0);
-            return frame.with(Unpooled.wrappedBuffer(output, 0, written));
+            try
+            {
+                int written = Snappy.compress(input, 0, input.length, output.array(), output.arrayOffset());
+                output.writerIndex(written);
+            }
+            catch (final Throwable e)
+            {
+                output.release();
+                throw e;
+            }
+            finally
+            {
+                //release the old frame
+                frame.release();
+            }
+
+            return frame.with(output);
         }
 
         public Frame decompress(Frame frame) throws IOException
@@ -87,9 +103,28 @@ public interface FrameCompressor
             if (!Snappy.isValidCompressedBuffer(input, 0, input.length))
                 throw new ProtocolException("Provided frame does not appear to be Snappy compressed");
 
-            byte[] output = new byte[Snappy.uncompressedLength(input)];
-            int size = Snappy.uncompress(input, 0, input.length, output, 0);
-            return frame.with(Unpooled.wrappedBuffer(output, 0, size));
+            ByteBuf output = CBUtil.onHeapAllocator.buffer(Snappy.uncompressedLength(input));
+
+            try
+            {
+                int size = Snappy.uncompress(input, 0, input.length, output.array(), output.arrayOffset());
+                output.writerIndex(size);
+
+                //release the old frame
+                frame.release();
+            }
+            catch (final Throwable e)
+            {
+                output.release();
+                throw e;
+            }
+            finally
+            {
+                //release the old frame
+                frame.release();
+            }
+
+            return frame.with(output);
         }
     }
 
@@ -121,21 +156,32 @@ public interface FrameCompressor
             byte[] input = CBUtil.readRawBytes(frame.body);
 
             int maxCompressedLength = compressor.maxCompressedLength(input.length);
-            byte[] output = new byte[INTEGER_BYTES + maxCompressedLength];
+            ByteBuf outputBuf = CBUtil.onHeapAllocator.buffer(INTEGER_BYTES + maxCompressedLength);
+
+            byte[] output = outputBuf.array();
+            int outputOffset = outputBuf.arrayOffset();
 
-            output[0] = (byte) (input.length >>> 24);
-            output[1] = (byte) (input.length >>> 16);
-            output[2] = (byte) (input.length >>>  8);
-            output[3] = (byte) (input.length);
+            output[outputOffset + 0] = (byte) (input.length >>> 24);
+            output[outputOffset + 1] = (byte) (input.length >>> 16);
+            output[outputOffset + 2] = (byte) (input.length >>>  8);
+            output[outputOffset + 3] = (byte) (input.length);
 
             try
             {
-                int written = compressor.compress(input, 0, input.length, output, INTEGER_BYTES, maxCompressedLength);
-                return frame.with(Unpooled.wrappedBuffer(output, 0, INTEGER_BYTES + written));
+                int written = compressor.compress(input, 0, input.length, output, outputOffset + INTEGER_BYTES, maxCompressedLength);
+                outputBuf.writerIndex(INTEGER_BYTES + written);
+
+                return frame.with(outputBuf);
+            }
+            catch (final Throwable e)
+            {
+                outputBuf.release();
+                throw e;
             }
-            catch (LZ4Exception e)
+            finally
             {
-                throw new IOException(e);
+                //release the old frame
+                frame.release();
             }
         }
 
@@ -148,19 +194,27 @@ public interface FrameCompressor
                                    | ((input[2] & 0xFF) <<  8)
                                    | ((input[3] & 0xFF));
 
-            byte[] output = new byte[uncompressedLength];
+            ByteBuf output = CBUtil.onHeapAllocator.buffer(uncompressedLength);
 
             try
             {
-                int read = decompressor.decompress(input, INTEGER_BYTES, output, 0, uncompressedLength);
+                int read = decompressor.decompress(input, INTEGER_BYTES, output.array(), output.arrayOffset(), uncompressedLength);
                 if (read != input.length - INTEGER_BYTES)
                     throw new IOException("Compressed lengths mismatch");
 
-                return frame.with(Unpooled.wrappedBuffer(output));
+                output.writerIndex(uncompressedLength);
+
+                return frame.with(output);
+            }
+            catch (final Throwable e)
+            {
+                output.release();
+                throw e;
             }
-            catch (LZ4Exception e)
+            finally
             {
-                throw new IOException(e);
+                //release the old frame
+                frame.release();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 32ae181..e39e02c 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.UUID;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.*;
 import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.handler.codec.MessageToMessageEncoder;
@@ -120,6 +119,7 @@ public abstract class Message
     public final Type type;
     protected volatile Connection connection;
     private volatile int streamId;
+    private volatile Frame sourceFrame;
 
     protected Message(Type type)
     {
@@ -147,6 +147,16 @@ public abstract class Message
         return streamId;
     }
 
+    public void setSourceFrame(Frame sourceFrame)
+    {
+        this.sourceFrame = sourceFrame;
+    }
+
+    public Frame getSourceFrame()
+    {
+        return sourceFrame;
+    }
+
     public static abstract class Request extends Message
     {
         protected boolean tracingRequested;
@@ -210,6 +220,7 @@ public abstract class Message
             {
                 Message message = frame.header.type.codec.decode(frame.body, frame.header.version);
                 message.setStreamId(frame.header.streamId);
+                message.setSourceFrame(frame);
 
                 if (isRequest)
                 {
@@ -229,8 +240,9 @@ public abstract class Message
 
                 results.add(message);
             }
-            catch (Exception ex)
+            catch (Throwable ex)
             {
+                frame.release();
                 // Remember the streamId
                 throw ErrorMessage.wrap(ex, frame.header.streamId);
             }
@@ -256,24 +268,33 @@ public abstract class Message
                 UUID tracingId = ((Response)message).getTracingId();
                 if (tracingId != null)
                 {
-                    body = Unpooled.buffer(CBUtil.sizeOfUUID(tracingId) + messageSize);
+                    body = CBUtil.allocator.buffer(CBUtil.sizeOfUUID(tracingId) + messageSize);
                     CBUtil.writeUUID(tracingId, body);
                     flags.add(Frame.Header.Flag.TRACING);
                 }
                 else
                 {
-                    body = Unpooled.buffer(messageSize);
+                    body = CBUtil.allocator.buffer(messageSize);
                 }
             }
             else
             {
                 assert message instanceof Request;
-                body = Unpooled.buffer(messageSize);
+                body = CBUtil.allocator.buffer(messageSize);
                 if (((Request)message).isTracingRequested())
                     flags.add(Frame.Header.Flag.TRACING);
             }
 
-            codec.encode(message, body, version);
+            try
+            {
+                codec.encode(message, body, version);
+            }
+            catch (Throwable e)
+            {
+                body.release();
+                throw ErrorMessage.wrap(e, message.getStreamId());
+            }
+
             results.add(Frame.create(message.type, message.getStreamId(), version, flags, body));
         }
     }
@@ -281,6 +302,11 @@ public abstract class Message
     @ChannelHandler.Sharable
     public static class Dispatcher extends SimpleChannelInboundHandler<Request>
     {
+        public Dispatcher()
+        {
+            super(false);
+        }
+
         @Override
         public void channelRead0(ChannelHandlerContext ctx, Request request)
         {
@@ -295,16 +321,20 @@ public abstract class Message
                 Response response = request.execute(qstate);
                 response.setStreamId(request.getStreamId());
                 response.attach(connection);
+                response.setSourceFrame(request.getSourceFrame());
                 connection.applyStateTransition(request.type, response.type);
 
                 logger.debug("Responding: {}, v={}", response, connection.getVersion());
 
-                ctx.channel().writeAndFlush(response);
+                ctx.writeAndFlush(response, ctx.voidPromise());
             }
-            catch (Exception ex)
+            catch (Throwable ex)
             {
                 // Don't let the exception propagate to exceptionCaught() if we can help it so that we can assign the right streamID.
-                ctx.channel().writeAndFlush(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()));
+                ctx.writeAndFlush(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()), ctx.voidPromise());
+            }
+            finally {
+                request.getSourceFrame().release();
             }
         }
 
@@ -314,13 +344,13 @@ public abstract class Message
         {
             if (ctx.channel().isOpen())
             {
-                ChannelFuture future = ctx.channel().writeAndFlush(ErrorMessage.fromException(cause));
+                ChannelFuture future = ctx.writeAndFlush(ErrorMessage.fromException(cause));
                 // On protocol exception, close the channel as soon as the message have been sent
                 if (cause instanceof ProtocolException)
                 {
                     future.addListener(new ChannelFutureListener() {
                         public void operationComplete(ChannelFuture future) {
-                            ctx.channel().close();
+                            ctx.close();
                         }
                     });
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index eb2b043..9c43f09 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -27,6 +27,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +78,7 @@ public class Server implements CassandraDaemon.Server
     public final InetSocketAddress socket;
     private final AtomicBoolean isRunning = new AtomicBoolean(false);
 
-    private EventLoopGroup bossGroup, workerGroup;
+    private EventLoopGroup workerGroup;
     private EventExecutor eventExecutorGroup;
 
     public Server(InetSocketAddress socket)
@@ -135,15 +138,16 @@ public class Server implements CassandraDaemon.Server
 
         // Configure the server.
         eventExecutorGroup = new RequestThreadPoolExecutor();
-        bossGroup = new NioEventLoopGroup();
         workerGroup = new NioEventLoopGroup();
 
         ServerBootstrap bootstrap = new ServerBootstrap()
-                                    .group(bossGroup, workerGroup)
+                                    .group(workerGroup)
                                     .channel(NioServerSocketChannel.class)
-                                    .childOption(ChannelOption.TCP_NODELAY, true);
+                                    .childOption(ChannelOption.TCP_NODELAY, true)
+                                    .childOption(ChannelOption.ALLOCATOR, CBUtil.allocator)
+                                    .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024)
+                                    .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
 
-        // Set up the event pipeline factory.
         final EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
         if (clientEnc.enabled)
         {
@@ -156,6 +160,7 @@ public class Server implements CassandraDaemon.Server
         }
 
         // Bind and start to accept incoming connections.
+        logger.info("Using Netty Version: {}", Version.identify().entrySet());
         logger.info("Starting listening for CQL clients on {}...", socket);
         Channel channel = bootstrap.bind(socket).channel();
         connectionTracker.allChannels.add(channel);
@@ -178,9 +183,7 @@ public class Server implements CassandraDaemon.Server
     {
         // Close opened connections
         connectionTracker.closeAll();
-        bossGroup.shutdownGracefully();
         workerGroup.shutdownGracefully();
-        bossGroup = null;
         workerGroup = null;
 
         eventExecutorGroup.shutdown();