You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/11/04 16:14:15 UTC
[1/2] git commit: Error out correctly when frame is too large
Updated Branches:
refs/heads/trunk a552b305f -> 2bbabe154
Error out correctly when frame is too large
patch by slebresne; reviewed by dnorberg for CASSANDRA-5981
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4a439d22
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4a439d22
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4a439d22
Branch: refs/heads/trunk
Commit: 4a439d22c53c658a1f05bf84f56a319312669a96
Parents: ce206e2
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Nov 4 16:12:02 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Nov 4 16:12:02 2013 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 6 +
conf/cassandra.yaml | 4 +
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 8 +
.../org/apache/cassandra/transport/Frame.java | 150 +++++++++++--------
6 files changed, 111 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 70d295b..a6636a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,7 @@
* Require Permission.SELECT for CAS updates (CASSANDRA-6247)
* New CQL-aware SSTableWriter (CASSANDRA-5894)
* Reject CAS operation when the protocol v1 is used (CASSANDRA-6270)
+ * Correctly throw error when frame too large (CASSANDRA-5981)
Merged from 1.2:
* Require logging in for Thrift CQL2/3 statement preparation (CASSANDRA-6254)
* restrict max_num_tokens to 1536 (CASSANDRA-6267)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index a34947b..b89be56 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -15,6 +15,12 @@ using the provided 'sstableupgrade' tool.
2.0.3
=====
+
+New features
+------------
+ - It's now possible to configure the maximum allowed size of the native
+ protocol frames (native_transport_max_frame_size_in_mb in the yaml file).
+
Upgrading
---------
- The IEndpointStateChangeSubscriber has a new method, beforeChange, that
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 455421a..b2e3a42 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -315,6 +315,10 @@ native_transport_port: 9042
# there is no native_transport_min_threads, idle threads will always be stopped
# after 30 seconds).
# native_transport_max_threads: 128
+#
+# The maximum size of allowed frame. Frame (requests) larger than this will
+# be rejected as invalid. The default is 256MB.
+# native_transport_max_frame_size_in_mb: 256
# Whether to start the thrift rpc server.
start_rpc: true
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 8f0f22e..3a7407e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -96,6 +96,7 @@ public class Config
public Boolean start_native_transport = false;
public Integer native_transport_port = 9042;
public Integer native_transport_max_threads = 128;
+ public Integer native_transport_max_frame_size_in_mb = 256;
@Deprecated
public Integer thrift_max_message_length_in_mb = 16;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index aca58b7..7cdc4e6 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -323,6 +323,9 @@ public class DatabaseDescriptor
if (conf.thrift_framed_transport_size_in_mb <= 0)
throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive");
+ if (conf.native_transport_max_frame_size_in_mb <= 0)
+ throw new ConfigurationException("native_transport_max_frame_size_in_mb must be positive");
+
/* end point snitch */
if (conf.endpoint_snitch == null)
{
@@ -1028,6 +1031,11 @@ public class DatabaseDescriptor
return conf.native_transport_max_threads;
}
+ public static int getNativeTransportMaxFrameSize()
+ {
+ return conf.native_transport_max_frame_size_in_mb * 1024 * 1024;
+ }
+
public static double getCommitLogSyncBatchWindow()
{
return conf.commitlog_sync_batch_window_in_ms;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a439d22/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 0d5185c..6472b39 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -28,6 +28,9 @@ import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
import org.jboss.netty.handler.codec.frame.*;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
public class Frame
@@ -52,27 +55,6 @@ public class Frame
this.body = body;
}
- public static Frame create(ChannelBuffer fullFrame)
- {
- assert fullFrame.readableBytes() >= Header.LENGTH : String.format("Frame too short (%d bytes = %s)",
- fullFrame.readableBytes(),
- ByteBufferUtil.bytesToHex(fullFrame.toByteBuffer()));
-
- int version = fullFrame.readByte();
- int flags = fullFrame.readByte();
- int streamId = fullFrame.readByte();
- int opcode = fullFrame.readByte();
- int length = fullFrame.readInt();
- assert length == fullFrame.readableBytes();
-
- // version first byte is the "direction" of the frame (request or response)
- Message.Direction direction = Message.Direction.extractFromVersion(version);
- version = version & 0x7F;
-
- Header header = new Header(version, flags, streamId, Message.Type.fromOpcode(opcode, direction));
- return new Frame(header, fullFrame);
- }
-
public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ChannelBuffer body)
{
Header header = new Header(version, flags, streamId, type);
@@ -83,6 +65,9 @@ public class Frame
{
public static final int LENGTH = 8;
+ public static final int BODY_LENGTH_OFFSET = 4;
+ public static final int BODY_LENGTH_SIZE = 4;
+
public final int version;
public final EnumSet<Flag> flags;
public final int streamId;
@@ -134,15 +119,19 @@ public class Frame
return new Frame(header, newBody);
}
- public static class Decoder extends LengthFieldBasedFrameDecoder
+ public static class Decoder extends FrameDecoder
{
- private static final int MAX_FRAME_LENTH = 256 * 1024 * 1024; // 256 MB
+ private static final int MAX_FRAME_LENGTH = DatabaseDescriptor.getNativeTransportMaxFrameSize();
+
+ private boolean discardingTooLongFrame;
+ private long tooLongFrameLength;
+ private long bytesToDiscard;
+ private int tooLongStreamId;
private final Connection.Factory factory;
public Decoder(Connection.Factory factory)
{
- super(MAX_FRAME_LENTH, 4, 4, 0, 0, true);
this.factory = factory;
}
@@ -150,55 +139,98 @@ public class Frame
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
throws Exception
{
- try
+ if (discardingTooLongFrame)
{
- // We must at least validate that the frame version is something we support/know and it doesn't hurt to
- // check the opcode is not garbage. And we should do that indenpently of what is the the bytes corresponding
- // to the frame length are, i.e. we shouldn't wait for super.decode() to return non-null.
- if (buffer.readableBytes() == 0)
- return null;
+ bytesToDiscard = discard(buffer, bytesToDiscard);
+ // If we have discarded everything, throw the exception
+ if (bytesToDiscard <= 0)
+ fail();
+ return null;
+ }
- int firstByte = buffer.getByte(0);
- Message.Direction direction = Message.Direction.extractFromVersion(firstByte);
- int version = firstByte & 0x7F;
+ // Wait until we have read at least the header
+ if (buffer.readableBytes() < Header.LENGTH)
+ return null;
- if (version > Server.CURRENT_VERSION)
- throw new ProtocolException("Invalid or unsupported protocol version: " + version);
+ int idx = buffer.readerIndex();
- // Validate the opcode
- if (buffer.readableBytes() >= 4)
- Message.Type.fromOpcode(buffer.getByte(3), direction);
+ int firstByte = buffer.getByte(idx);
+ Message.Direction direction = Message.Direction.extractFromVersion(firstByte);
+ int version = firstByte & 0x7F;
- ChannelBuffer frame = (ChannelBuffer) super.decode(ctx, channel, buffer);
- if (frame == null)
- {
- return null;
- }
+ if (version > Server.CURRENT_VERSION)
+ throw new ProtocolException("Invalid or unsupported protocol version: " + version);
- Connection connection = (Connection)channel.getAttachment();
- if (connection == null)
- {
- // First message seen on this channel, attach the connection object
- connection = factory.newConnection(channel, version);
- channel.setAttachment(connection);
- }
- else if (connection.getVersion() != version)
- {
- throw new ProtocolException(String.format("Invalid message version. Got %d but previous messages on this connection had version %d", version, connection.getVersion()));
- }
- return Frame.create(frame);
+ int flags = buffer.getByte(idx + 1);
+ int streamId = buffer.getByte(idx + 2);
+
+ // This throws a protocol exceptions if the opcode is unknown
+ Message.Type type = Message.Type.fromOpcode(buffer.getByte(idx + 3), direction);
+
+ long bodyLength = buffer.getUnsignedInt(idx + Header.BODY_LENGTH_OFFSET);
+
+ if (bodyLength < 0)
+ {
+ buffer.skipBytes(Header.LENGTH);
+ throw new ProtocolException("Invalid frame body length: " + bodyLength);
+ }
+
+ long frameLength = bodyLength + Header.LENGTH;
+ if (frameLength > MAX_FRAME_LENGTH)
+ {
+ // Enter the discard mode and discard everything received so far.
+ discardingTooLongFrame = true;
+ tooLongStreamId = streamId;
+ tooLongFrameLength = frameLength;
+ bytesToDiscard = discard(buffer, frameLength);
+ if (bytesToDiscard <= 0)
+ fail();
+ return null;
}
- catch (CorruptedFrameException e)
+
+ // never overflows because it's less than the max frame length
+ int frameLengthInt = (int) frameLength;
+ if (buffer.readableBytes() < frameLengthInt)
+ return null;
+
+ // extract body
+ ChannelBuffer body = extractFrame(buffer, idx + Header.LENGTH, (int)bodyLength);
+ buffer.readerIndex(idx + frameLengthInt);
+
+ Connection connection = (Connection)channel.getAttachment();
+ if (connection == null)
{
- throw new ProtocolException(e.getMessage());
+ // First message seen on this channel, attach the connection object
+ connection = factory.newConnection(channel, version);
+ channel.setAttachment(connection);
}
- catch (TooLongFrameException e)
+ else if (connection.getVersion() != version)
{
- throw new ProtocolException(e.getMessage());
+ throw new ProtocolException(String.format("Invalid message version. Got %d but previous messages on this connection had version %d", version, connection.getVersion()));
}
+
+ return new Frame(new Header(version, flags, streamId, type), body);
+ }
+
+ private void fail()
+ {
+ // Reset to the initial state and throw the exception
+ long tooLongFrameLength = this.tooLongFrameLength;
+ this.tooLongFrameLength = 0;
+ discardingTooLongFrame = false;
+ String msg = String.format("Request is too big: length %d exceeds maximum allowed length %d.", tooLongFrameLength, MAX_FRAME_LENGTH);
+ throw ErrorMessage.wrap(new InvalidRequestException(msg), tooLongStreamId);
}
}
+ // How much remains to be discarded
+ private static long discard(ChannelBuffer buffer, long remainingToDiscard)
+ {
+ int availableToDiscard = (int) Math.min(remainingToDiscard, buffer.readableBytes());
+ buffer.skipBytes(availableToDiscard);
+ return remainingToDiscard - availableToDiscard;
+ }
+
public static class Encoder extends OneToOneEncoder
{
public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
[2/2] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by sl...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2bbabe15
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2bbabe15
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2bbabe15
Branch: refs/heads/trunk
Commit: 2bbabe154e350d4b1b69cb8d8bb2290d620db81d
Parents: a552b30 4a439d2
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Nov 4 16:13:06 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Nov 4 16:13:06 2013 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 6 +
conf/cassandra.yaml | 4 +
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 8 +
.../org/apache/cassandra/transport/Frame.java | 150 +++++++++++--------
6 files changed, 111 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bbabe15/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bbabe15/NEWS.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bbabe15/conf/cassandra.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bbabe15/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bbabe15/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------