You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/05/07 04:04:01 UTC
git commit: Optimize netty server
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 903069318 -> bc4b008bf
Optimize netty server
Patch by tjake; reviewed by Benedict Elliott Smith for CASSANDRA-6861
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bc4b008b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bc4b008b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bc4b008b
Branch: refs/heads/cassandra-2.1
Commit: bc4b008bf138f3542f228624b9e9a4a4301ea8b2
Parents: 9030693
Author: Jake Luciani <ja...@apache.org>
Authored: Tue May 6 21:22:03 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Tue May 6 21:22:03 2014 -0400
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../org/apache/cassandra/transport/CBUtil.java | 9 +-
.../org/apache/cassandra/transport/Frame.java | 10 ++-
.../cassandra/transport/FrameCompressor.java | 94 +++++++++++++++-----
.../org/apache/cassandra/transport/Message.java | 52 ++++++++---
.../org/apache/cassandra/transport/Server.java | 17 ++--
6 files changed, 142 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 88ff5d2..6564aa6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,7 +3,8 @@
* Fix bugs in supercolumns handling (CASSANDRA-7138)
* Fix ClassClassException on composite dense tables (CASSANDRA-7112)
* Cleanup and optimize collation and slice iterators (CASSANDRA-7107)
- * Upgrade NBHM lib (CASSANDRA-7128)
+ * Upgrade NBHM lib (CASSANDRA-7128)
+ * Optimize netty server (CASSANDRA-6861)
Merged from 2.0:
* Correctly delete scheduled range xfers (CASSANDRA-7143)
* Make batchlog replica selection rack-aware (CASSANDRA-6551)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 36a7e71..e6ba029 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -30,12 +30,15 @@ import java.util.Map;
import java.util.UUID;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
@@ -48,6 +51,9 @@ import org.apache.cassandra.utils.UUIDGen;
*/
public abstract class CBUtil
{
+ public static final ByteBufAllocator allocator = new PooledByteBufAllocator(true);
+ public static final ByteBufAllocator onHeapAllocator = new PooledByteBufAllocator(false);
+
private CBUtil() {}
private static String readString(ByteBuf cb, int length)
@@ -300,7 +306,8 @@ public abstract class CBUtil
if (slice.nioBufferCount() > 0)
return slice.nioBuffer();
else
- return Unpooled.copiedBuffer(slice).nioBuffer();
+ return ByteBuffer.wrap(readRawBytes(cb));
+
}
public static void writeValue(byte[] bytes, ByteBuf cb)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 70fe150..bec3c96 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -55,6 +55,11 @@ public class Frame
this.body = body;
}
+ public void release()
+ {
+ body.release();
+ }
+
public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ByteBuf body)
{
Header header = new Header(version, flags, streamId, type);
@@ -194,8 +199,7 @@ public class Frame
return;
// extract body
- // TODO: do we need unpooled?
- ByteBuf body = Unpooled.copiedBuffer(buffer.duplicate().slice(idx + Header.LENGTH, (int) bodyLength));
+ ByteBuf body = CBUtil.allocator.buffer((int) bodyLength).writeBytes(buffer.duplicate().slice(idx + Header.LENGTH, (int) bodyLength));
buffer.readerIndex(idx + frameLengthInt);
Connection connection = ctx.channel().attr(Connection.attributeKey).get();
@@ -238,7 +242,7 @@ public class Frame
public void encode(ChannelHandlerContext ctx, Frame frame, List results)
throws IOException
{
- ByteBuf header = Unpooled.buffer(Frame.Header.LENGTH);
+ ByteBuf header = CBUtil.allocator.buffer(Frame.Header.LENGTH);
Message.Type type = frame.header.type;
header.writeByte(type.direction.addToVersion(frame.header.version));
header.writeByte(Header.Flag.serialize(frame.header.flags));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/src/java/org/apache/cassandra/transport/FrameCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java
index 3e8c555..8312b90 100644
--- a/src/java/org/apache/cassandra/transport/FrameCompressor.java
+++ b/src/java/org/apache/cassandra/transport/FrameCompressor.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.transport;
import java.io.IOException;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.xerial.snappy.Snappy;
import org.xerial.snappy.SnappyError;
@@ -74,10 +75,25 @@ public interface FrameCompressor
public Frame compress(Frame frame) throws IOException
{
byte[] input = CBUtil.readRawBytes(frame.body);
- byte[] output = new byte[Snappy.maxCompressedLength(input.length)];
+ ByteBuf output = CBUtil.onHeapAllocator.buffer(Snappy.maxCompressedLength(input.length));
- int written = Snappy.compress(input, 0, input.length, output, 0);
- return frame.with(Unpooled.wrappedBuffer(output, 0, written));
+ try
+ {
+ int written = Snappy.compress(input, 0, input.length, output.array(), output.arrayOffset());
+ output.writerIndex(written);
+ }
+ catch (final Throwable e)
+ {
+ output.release();
+ throw e;
+ }
+ finally
+ {
+ //release the old frame
+ frame.release();
+ }
+
+ return frame.with(output);
}
public Frame decompress(Frame frame) throws IOException
@@ -87,9 +103,28 @@ public interface FrameCompressor
if (!Snappy.isValidCompressedBuffer(input, 0, input.length))
throw new ProtocolException("Provided frame does not appear to be Snappy compressed");
- byte[] output = new byte[Snappy.uncompressedLength(input)];
- int size = Snappy.uncompress(input, 0, input.length, output, 0);
- return frame.with(Unpooled.wrappedBuffer(output, 0, size));
+ ByteBuf output = CBUtil.onHeapAllocator.buffer(Snappy.uncompressedLength(input));
+
+ try
+ {
+ int size = Snappy.uncompress(input, 0, input.length, output.array(), output.arrayOffset());
+ output.writerIndex(size);
+
+ //release the old frame
+ frame.release();
+ }
+ catch (final Throwable e)
+ {
+ output.release();
+ throw e;
+ }
+ finally
+ {
+ //release the old frame
+ frame.release();
+ }
+
+ return frame.with(output);
}
}
@@ -121,21 +156,32 @@ public interface FrameCompressor
byte[] input = CBUtil.readRawBytes(frame.body);
int maxCompressedLength = compressor.maxCompressedLength(input.length);
- byte[] output = new byte[INTEGER_BYTES + maxCompressedLength];
+ ByteBuf outputBuf = CBUtil.onHeapAllocator.buffer(INTEGER_BYTES + maxCompressedLength);
+
+ byte[] output = outputBuf.array();
+ int outputOffset = outputBuf.arrayOffset();
- output[0] = (byte) (input.length >>> 24);
- output[1] = (byte) (input.length >>> 16);
- output[2] = (byte) (input.length >>> 8);
- output[3] = (byte) (input.length);
+ output[outputOffset + 0] = (byte) (input.length >>> 24);
+ output[outputOffset + 1] = (byte) (input.length >>> 16);
+ output[outputOffset + 2] = (byte) (input.length >>> 8);
+ output[outputOffset + 3] = (byte) (input.length);
try
{
- int written = compressor.compress(input, 0, input.length, output, INTEGER_BYTES, maxCompressedLength);
- return frame.with(Unpooled.wrappedBuffer(output, 0, INTEGER_BYTES + written));
+ int written = compressor.compress(input, 0, input.length, output, outputOffset + INTEGER_BYTES, maxCompressedLength);
+ outputBuf.writerIndex(INTEGER_BYTES + written);
+
+ return frame.with(outputBuf);
+ }
+ catch (final Throwable e)
+ {
+ outputBuf.release();
+ throw e;
}
- catch (LZ4Exception e)
+ finally
{
- throw new IOException(e);
+ //release the old frame
+ frame.release();
}
}
@@ -148,19 +194,27 @@ public interface FrameCompressor
| ((input[2] & 0xFF) << 8)
| ((input[3] & 0xFF));
- byte[] output = new byte[uncompressedLength];
+ ByteBuf output = CBUtil.onHeapAllocator.buffer(uncompressedLength);
try
{
- int read = decompressor.decompress(input, INTEGER_BYTES, output, 0, uncompressedLength);
+ int read = decompressor.decompress(input, INTEGER_BYTES, output.array(), output.arrayOffset(), uncompressedLength);
if (read != input.length - INTEGER_BYTES)
throw new IOException("Compressed lengths mismatch");
- return frame.with(Unpooled.wrappedBuffer(output));
+ output.writerIndex(uncompressedLength);
+
+ return frame.with(output);
+ }
+ catch (final Throwable e)
+ {
+ output.release();
+ throw e;
}
- catch (LZ4Exception e)
+ finally
{
- throw new IOException(e);
+ //release the old frame
+ frame.release();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 32ae181..e39e02c 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.UUID;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
@@ -120,6 +119,7 @@ public abstract class Message
public final Type type;
protected volatile Connection connection;
private volatile int streamId;
+ private volatile Frame sourceFrame;
protected Message(Type type)
{
@@ -147,6 +147,16 @@ public abstract class Message
return streamId;
}
+ public void setSourceFrame(Frame sourceFrame)
+ {
+ this.sourceFrame = sourceFrame;
+ }
+
+ public Frame getSourceFrame()
+ {
+ return sourceFrame;
+ }
+
public static abstract class Request extends Message
{
protected boolean tracingRequested;
@@ -210,6 +220,7 @@ public abstract class Message
{
Message message = frame.header.type.codec.decode(frame.body, frame.header.version);
message.setStreamId(frame.header.streamId);
+ message.setSourceFrame(frame);
if (isRequest)
{
@@ -229,8 +240,9 @@ public abstract class Message
results.add(message);
}
- catch (Exception ex)
+ catch (Throwable ex)
{
+ frame.release();
// Remember the streamId
throw ErrorMessage.wrap(ex, frame.header.streamId);
}
@@ -256,24 +268,33 @@ public abstract class Message
UUID tracingId = ((Response)message).getTracingId();
if (tracingId != null)
{
- body = Unpooled.buffer(CBUtil.sizeOfUUID(tracingId) + messageSize);
+ body = CBUtil.allocator.buffer(CBUtil.sizeOfUUID(tracingId) + messageSize);
CBUtil.writeUUID(tracingId, body);
flags.add(Frame.Header.Flag.TRACING);
}
else
{
- body = Unpooled.buffer(messageSize);
+ body = CBUtil.allocator.buffer(messageSize);
}
}
else
{
assert message instanceof Request;
- body = Unpooled.buffer(messageSize);
+ body = CBUtil.allocator.buffer(messageSize);
if (((Request)message).isTracingRequested())
flags.add(Frame.Header.Flag.TRACING);
}
- codec.encode(message, body, version);
+ try
+ {
+ codec.encode(message, body, version);
+ }
+ catch (Throwable e)
+ {
+ body.release();
+ throw ErrorMessage.wrap(e, message.getStreamId());
+ }
+
results.add(Frame.create(message.type, message.getStreamId(), version, flags, body));
}
}
@@ -281,6 +302,11 @@ public abstract class Message
@ChannelHandler.Sharable
public static class Dispatcher extends SimpleChannelInboundHandler<Request>
{
+ public Dispatcher()
+ {
+ super(false);
+ }
+
@Override
public void channelRead0(ChannelHandlerContext ctx, Request request)
{
@@ -295,16 +321,20 @@ public abstract class Message
Response response = request.execute(qstate);
response.setStreamId(request.getStreamId());
response.attach(connection);
+ response.setSourceFrame(request.getSourceFrame());
connection.applyStateTransition(request.type, response.type);
logger.debug("Responding: {}, v={}", response, connection.getVersion());
- ctx.channel().writeAndFlush(response);
+ ctx.writeAndFlush(response, ctx.voidPromise());
}
- catch (Exception ex)
+ catch (Throwable ex)
{
// Don't let the exception propagate to exceptionCaught() if we can help it so that we can assign the right streamID.
- ctx.channel().writeAndFlush(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()));
+ ctx.writeAndFlush(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()), ctx.voidPromise());
+ }
+ finally {
+ request.getSourceFrame().release();
}
}
@@ -314,13 +344,13 @@ public abstract class Message
{
if (ctx.channel().isOpen())
{
- ChannelFuture future = ctx.channel().writeAndFlush(ErrorMessage.fromException(cause));
+ ChannelFuture future = ctx.writeAndFlush(ErrorMessage.fromException(cause));
// On protocol exception, close the channel as soon as the message have been sent
if (cause instanceof ProtocolException)
{
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
- ctx.channel().close();
+ ctx.close();
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bc4b008b/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index eb2b043..9c43f09 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -27,6 +27,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +78,7 @@ public class Server implements CassandraDaemon.Server
public final InetSocketAddress socket;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
- private EventLoopGroup bossGroup, workerGroup;
+ private EventLoopGroup workerGroup;
private EventExecutor eventExecutorGroup;
public Server(InetSocketAddress socket)
@@ -135,15 +138,16 @@ public class Server implements CassandraDaemon.Server
// Configure the server.
eventExecutorGroup = new RequestThreadPoolExecutor();
- bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
- .group(bossGroup, workerGroup)
+ .group(workerGroup)
.channel(NioServerSocketChannel.class)
- .childOption(ChannelOption.TCP_NODELAY, true);
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.ALLOCATOR, CBUtil.allocator)
+ .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024)
+ .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
- // Set up the event pipeline factory.
final EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
if (clientEnc.enabled)
{
@@ -156,6 +160,7 @@ public class Server implements CassandraDaemon.Server
}
// Bind and start to accept incoming connections.
+ logger.info("Using Netty Version: {}", Version.identify().entrySet());
logger.info("Starting listening for CQL clients on {}...", socket);
Channel channel = bootstrap.bind(socket).channel();
connectionTracker.allChannels.add(channel);
@@ -178,9 +183,7 @@ public class Server implements CassandraDaemon.Server
{
// Close opened connections
connectionTracker.closeAll();
- bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
- bossGroup = null;
workerGroup = null;
eventExecutorGroup.shutdown();