You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/05/29 17:10:11 UTC

git commit: Support more concurrent requests in native protocol

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 969a92804 -> a4bea74ae


Support more concurrent requests in native protocol

patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-7231


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a4bea74a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a4bea74a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a4bea74a

Branch: refs/heads/cassandra-2.1
Commit: a4bea74aeb9dcd8a2104ae1a7d4660a65af7420f
Parents: 969a928
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu May 29 18:08:37 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu May 29 18:08:37 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 doc/native_protocol_v3.spec                     | 19 ++---
 .../org/apache/cassandra/transport/Frame.java   | 85 ++++++++++++++------
 .../org/apache/cassandra/transport/Server.java  |  4 +-
 4 files changed, 74 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a4bea74a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8448ea6..2fc6751 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.0-rc1
+ * Support more concurrent requests in native protocol (CASSANDRA-7231)
  * Add tab-completion to debian nodetool packaging (CASSANDRA-6421)
  * Change concurrent_compactors defaults (CASSANDRA-7139)
  * Add PowerShell Windows launch scripts (CASSANDRA-7001)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a4bea74a/doc/native_protocol_v3.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v3.spec b/doc/native_protocol_v3.spec
index 400868c..5e9f439 100644
--- a/doc/native_protocol_v3.spec
+++ b/doc/native_protocol_v3.spec
@@ -48,10 +48,10 @@ Table of Contents
 
   The CQL binary protocol is a frame based protocol. Frames are defined as:
 
-      0         8        16        24        32
-      +---------+---------+---------+---------+
-      | version |  flags  | stream  | opcode  |
-      +---------+---------+---------+---------+
+      0         8        16        24        32         40
+      +---------+---------+---------+---------+---------+
+      | version |  flags  |      stream       | opcode  |
+      +---------+---------+---------+---------+---------+
       |                length                 |
       +---------+---------+---------+---------+
       |                                       |
@@ -62,7 +62,7 @@ Table of Contents
 
   The protocol is big-endian (network byte order).
 
-  Each frame contains a fixed size header (8 bytes) followed by a variable size
+  Each frame contains a fixed size header (9 bytes) followed by a variable size
   body. The header is described in Section 2. The content of the body depends
   on the header opcode value (the body can in particular be empty for some
   opcode values). The list of allowed opcode is defined Section 2.3 and the
@@ -129,8 +129,8 @@ Table of Contents
 
 2.3. stream
 
-  A frame has a stream id (one signed byte). When sending request messages, this
-  stream id must be set by the client to a positive byte (negative stream id
+  A frame has a stream id (a [short] value). When sending request messages, this
+  stream id must be set by the client to a non-negative value (negative stream id
   are reserved for streams initiated by the server; currently all EVENT messages
   (section 4.2.6) have a streamId of -1). If a client sends a request message
   with the stream id X, it is guaranteed that the stream id of the response to
@@ -142,13 +142,13 @@ Table of Contents
   writes REQ_1, REQ_2, REQ_3 on the wire (in that order), the server might
   respond to REQ_3 (or REQ_2) first. Assigning different stream id to these 3
   requests allows the client to distinguish to which request an received answer
-  respond to. As there can only be 128 different simultaneous stream, it is up
+  respond to. As there can only be 32768 different simultaneous streams, it is up
   to the client to reuse stream id.
 
   Note that clients are free to use the protocol synchronously (i.e. wait for
   the response to REQ_N before sending REQ_N+1). In that case, the stream id
   can be safely set to 0. Clients should also feel free to use only a subset of
-  the 128 maximum possible stream ids if it is simpler for those
+  the 32768 maximum possible stream ids if it is simpler for those
   implementation.
 
 2.4. opcode
@@ -902,6 +902,7 @@ Table of Contents
               bytes] representing the unknown ID.
 
 10. Changes from v2
+  * stream id is now 2 bytes long (a [short] value), so the header is now 1 byte longer (9 bytes total).
   * BATCH messages now have <flags> (like QUERY and EXECUTE) and a corresponding optional
     <serial_consistency> parameters (see Section 4.1.7).
   * User Defined Types and tuple types have to added to ResultSet metadata (see 4.2.5.2) and a

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a4bea74a/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index bec3c96..3e66ff7 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -39,8 +38,19 @@ public class Frame
     public final ByteBuf body;
 
     /**
-     * On-wire frame.
-     * Frames are defined as:
+     * An on-wire frame consists of a header and a body.
+     *
+     * The header is defined the following way in native protocol version 3 and later:
+     *
+     *   0         8        16        24        32         40
+     *   +---------+---------+---------+---------+---------+
+     *   | version |  flags  |      stream       | opcode  |
+     *   +---------+---------+---------+---------+---------+
+     *   |                length                 |
+     *   +---------+---------+---------+---------+
+     *
+     *
+     * In versions 1 and 2 the header has a smaller (1 byte) stream id, and is thus defined the following way:
      *
      *   0         8        16        24        32
      *   +---------+---------+---------+---------+
@@ -68,9 +78,10 @@ public class Frame
 
     public static class Header
     {
-        public static final int LENGTH = 8;
+        // 8 bytes in protocol versions 1 and 2, 8 bytes in protocol version 3 and later
+        public static final int MODERN_LENGTH = 9;
+        public static final int LEGACY_LENGTH = 8;
 
-        public static final int BODY_LENGTH_OFFSET = 4;
         public static final int BODY_LENGTH_SIZE = 4;
 
         public final int version;
@@ -153,34 +164,52 @@ public class Frame
                 return;
             }
 
-            // Wait until we have read at least the header
-            if (buffer.readableBytes() < Header.LENGTH)
+            // Wait until we have read at least the short header
+            if (buffer.readableBytes() < Header.LEGACY_LENGTH)
                 return;
 
             int idx = buffer.readerIndex();
 
-            int firstByte = buffer.getByte(idx);
+            int firstByte = buffer.getByte(idx++);
             Message.Direction direction = Message.Direction.extractFromVersion(firstByte);
             int version = firstByte & 0x7F;
 
             if (version > Server.CURRENT_VERSION)
                 throw new ProtocolException("Invalid or unsupported protocol version: " + version);
 
-            int flags = buffer.getByte(idx + 1);
-            int streamId = buffer.getByte(idx + 2);
+            // Wait until we have the complete V3+ header
+            if (version >= Server.VERSION_3 && buffer.readableBytes() < Header.MODERN_LENGTH)
+                return;
+
+            int flags = buffer.getByte(idx++);
+
+            int streamId, headerLength;
+            if (version >= Server.VERSION_3)
+            {
+                streamId = buffer.getShort(idx);
+                idx += 2;
+                headerLength = Header.MODERN_LENGTH;
+            }
+            else
+            {
+                streamId = buffer.getByte(idx);
+                idx++;
+                headerLength = Header.LEGACY_LENGTH;
+            }
 
             // This throws a protocol exceptions if the opcode is unknown
-            Message.Type type = Message.Type.fromOpcode(buffer.getByte(idx + 3), direction);
+            Message.Type type = Message.Type.fromOpcode(buffer.getByte(idx++), direction);
 
-            long bodyLength = buffer.getUnsignedInt(idx + Header.BODY_LENGTH_OFFSET);
+            long bodyLength = buffer.getUnsignedInt(idx);
+            idx += Header.BODY_LENGTH_SIZE;
 
             if (bodyLength < 0)
             {
-                buffer.skipBytes(Header.LENGTH);
+                buffer.skipBytes(headerLength);
                 throw new ProtocolException("Invalid frame body length: " + bodyLength);
             }
 
-            long frameLength = bodyLength + Header.LENGTH;
+            long frameLength = bodyLength + headerLength;
             if (frameLength > MAX_FRAME_LENGTH)
             {
                 // Enter the discard mode and discard everything received so far.
@@ -193,14 +222,13 @@ public class Frame
                 return;
             }
 
-            // never overflows because it's less than the max frame length
-            int frameLengthInt = (int) frameLength;
-            if (buffer.readableBytes() < frameLengthInt)
+            if (buffer.readableBytes() < frameLength)
                 return;
 
             // extract body
-            ByteBuf body = CBUtil.allocator.buffer((int) bodyLength).writeBytes(buffer.duplicate().slice(idx + Header.LENGTH, (int) bodyLength));
-            buffer.readerIndex(idx + frameLengthInt);
+            ByteBuf body = CBUtil.allocator.buffer((int) bodyLength).writeBytes(buffer.duplicate().slice(idx, (int) bodyLength));
+            idx += bodyLength;
+            buffer.readerIndex(idx);
 
             Connection connection = ctx.channel().attr(Connection.attributeKey).get();
             if (connection == null)
@@ -239,14 +267,23 @@ public class Frame
     @ChannelHandler.Sharable
     public static class Encoder extends MessageToMessageEncoder<Frame>
     {
-        public void encode(ChannelHandlerContext ctx, Frame frame, List results)
+        public void encode(ChannelHandlerContext ctx, Frame frame, List<Object> results)
         throws IOException
         {
-            ByteBuf header = CBUtil.allocator.buffer(Frame.Header.LENGTH);
+            int headerLength = frame.header.version >= Server.VERSION_3
+                             ? Header.MODERN_LENGTH
+                             : Header.LEGACY_LENGTH;
+            ByteBuf header = CBUtil.allocator.buffer(headerLength);
+
             Message.Type type = frame.header.type;
             header.writeByte(type.direction.addToVersion(frame.header.version));
             header.writeByte(Header.Flag.serialize(frame.header.flags));
-            header.writeByte(frame.header.streamId);
+
+            if (frame.header.version >= Server.VERSION_3)
+                header.writeShort(frame.header.streamId);
+            else
+                header.writeByte(frame.header.streamId);
+
             header.writeByte(type.opcode);
             header.writeInt(frame.body.readableBytes());
 
@@ -257,7 +294,7 @@ public class Frame
     @ChannelHandler.Sharable
     public static class Decompressor extends MessageToMessageDecoder<Frame>
     {
-        public void decode(ChannelHandlerContext ctx, Frame frame, List results)
+        public void decode(ChannelHandlerContext ctx, Frame frame, List<Object> results)
         throws IOException
         {
             Connection connection = ctx.channel().attr(Connection.attributeKey).get();
@@ -282,7 +319,7 @@ public class Frame
     @ChannelHandler.Sharable
     public static class Compressor extends MessageToMessageEncoder<Frame>
     {
-        public void encode(ChannelHandlerContext ctx, Frame frame, List results)
+        public void encode(ChannelHandlerContext ctx, Frame frame, List<Object> results)
         throws IOException
         {
             Connection connection = ctx.channel().attr(Connection.attributeKey).get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a4bea74a/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 9c43f09..2fed889 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -62,8 +62,8 @@ public class Server implements CassandraDaemon.Server
 
     private static final Logger logger = LoggerFactory.getLogger(Server.class);
 
-    /** current version of the native protocol we support */
-    public static final int CURRENT_VERSION = 2;
+    public static final int VERSION_3 = 3;
+    public static final int CURRENT_VERSION = VERSION_3;
 
     private final ConnectionTracker connectionTracker = new ConnectionTracker();