You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/09/07 10:56:18 UTC

git commit: Simplify and generalize startup message in binary protocol

Updated Branches:
  refs/heads/trunk d5fc1932e -> f1711794c


Simplify and generalize startup message in binary protocol

patch by slebresne; reviewed by thepaul for CASSANDRA-4539


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1711794
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1711794
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1711794

Branch: refs/heads/trunk
Commit: f1711794cf911c349a3b60e0104064adb8124148
Parents: d5fc193
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Sep 7 10:55:13 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Sep 7 10:55:13 2012 +0200

----------------------------------------------------------------------
 doc/native_protocol.spec                           |   30 +++---
 .../org/apache/cassandra/transport/CBUtil.java     |   48 +++++++++
 .../org/apache/cassandra/transport/Client.java     |    7 +-
 .../apache/cassandra/transport/SimpleClient.java   |    7 +-
 .../transport/messages/OptionsMessage.java         |   20 +++-
 .../transport/messages/StartupMessage.java         |   80 +++------------
 .../transport/messages/SupportedMessage.java       |   23 ++---
 7 files changed, 109 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/doc/native_protocol.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec
index d19963b..1de7a95 100644
--- a/doc/native_protocol.spec
+++ b/doc/native_protocol.spec
@@ -161,6 +161,11 @@ Table of Contents
                    will be described when this is used.
     [option list]  A [short] n, followed by n [option].
 
+    [string map]      A [short] n, followed by n pair <k><v> where <k> and <v>
+                      are [string].
+    [string multimap] A [short] n, followed by n pair <k><v> where <k> is a
+                      [string] and <v> is a [string list].
+
 
 4. Messages
 
@@ -176,19 +181,16 @@ Table of Contents
   (in which case credentials will need to be provided using CREDENTIALS).
 
   This must be the first message of the connection, except for OPTIONS that can
-  be sent before to find out the option supported by the server. Once the
+  be sent before to find out the options supported by the server. Once the
   connection has been initialized, a client should not send any more STARTUP
   message.
 
-  The body is defined as:
-    <version><options>
-  where:
-    - <version> is a [string] representing the version of the CQL version to use.
-      Currently the only version supported is 3.0.0. Note that this is different
-      from the protocol version.
-    - <options> is an [option list]. Valid option ids are:
-        0x0001    Compression: the value is a [string] representing the
-                  algorithm to use (See section 5).
+  The body is a [string map] of options. Possible options are:
+    - "CQL_VERSION": the version of CQL to use. This option is mandatory and
+      currenty, the only version supported is "3.0.0". Note that this is
+      different from the protocol version.
+    - "COMPRESSION": the compression algorithm to use for frames (See section 5).
+      This is optional, if not specified no compression will be used.
 
 
 4.1.2. CREDENTIALS
@@ -284,10 +286,8 @@ Table of Contents
   Indicates which startup options are supported by the server. This message
   comes as a response to an OPTIONS message.
 
-  The body of a SUPPORTED message is a [string list] indicating which CQL
-  version the server support, followed by a second [string list] indicating
-  which compression algorithm is supported, if any (at the time of this writing,
-  only snappy compression is available if the library is in the classpath).
+  The body of a SUPPORTED message is a [string multimap]. This multimap gives
+  for each of the supported STARTUP options, the list of supported values.
 
 
 4.2.5. RESULT
@@ -397,7 +397,7 @@ Table of Contents
   use, which is done in the STARTUP message. As a consequence, a STARTUP message
   must never be compressed.  However, once the STARTUP frame has been received
   by the server can be compressed (including the response to the STARTUP
-  request). Frame do not have to compressed however, even if compression has
+  request). Frame do not have to be compressed however, even if compression has
   been agreed upon (a server may only compress frame above a certain size at its
   discretion). A frame body should be compressed if and only if the compressed
   flag (see Section 2.2) is set.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 791f097..44ae64d 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -20,7 +20,9 @@ package org.apache.cassandra.transport;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -128,6 +130,52 @@ public abstract class CBUtil
             cb.writeBytes(stringToCB(str));
     }
 
+    public static Map<String, String> readStringMap(ChannelBuffer cb)
+    {
+        int length = cb.readUnsignedShort();
+        Map<String, String> m = new HashMap<String, String>(length);
+        for (int i = 0; i < length; i++)
+        {
+            String k = readString(cb).toUpperCase();
+            String v = readString(cb);
+            m.put(k, v);
+        }
+        return m;
+    }
+
+    public static void writeStringMap(ChannelBuffer cb, Map<String, String> m)
+    {
+        cb.writeShort(m.size());
+        for (Map.Entry<String, String> entry : m.entrySet())
+        {
+            cb.writeBytes(stringToCB(entry.getKey()));
+            cb.writeBytes(stringToCB(entry.getValue()));
+        }
+    }
+
+    public static Map<String, List<String>> readStringToStringListMap(ChannelBuffer cb)
+    {
+        int length = cb.readUnsignedShort();
+        Map<String, List<String>> m = new HashMap<String, List<String>>(length);
+        for (int i = 0; i < length; i++)
+        {
+            String k = readString(cb).toUpperCase();
+            List<String> v = readStringList(cb);
+            m.put(k, v);
+        }
+        return m;
+    }
+
+    public static void writeStringToStringListMap(ChannelBuffer cb, Map<String, List<String>> m)
+    {
+        cb.writeShort(m.size());
+        for (Map.Entry<String, List<String>> entry : m.entrySet())
+        {
+            cb.writeBytes(stringToCB(entry.getKey()));
+            writeStringList(cb, entry.getValue());
+        }
+    }
+
     public static ByteBuffer readValue(ChannelBuffer cb)
     {
         int length = cb.readInt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index c65522e..2cf815f 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -82,17 +82,18 @@ public class Client extends SimpleClient
         String msgType = iter.next().toUpperCase();
         if (msgType.equals("STARTUP"))
         {
-            EnumMap<StartupMessage.Option, Object> options = new EnumMap<StartupMessage.Option, Object>(StartupMessage.Option.class);
+            Map<String, String> options = new HashMap<String, String>();
+            options.put(StartupMessage.CQL_VERSION, "3.0.0");
             while (iter.hasNext())
             {
                String next = iter.next();
                if (next.toLowerCase().equals("snappy"))
                {
-                   options.put(StartupMessage.Option.COMPRESSION, "snappy");
+                   options.put(StartupMessage.COMPRESSION, "snappy");
                    connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
                }
             }
-            return new StartupMessage("3.0.0", options);
+            return new StartupMessage(options);
         }
         else if (msgType.equals("QUERY"))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 9e0ea2e..66cf5ae 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -74,13 +74,14 @@ public class SimpleClient
     {
         establishConnection();
 
-        EnumMap<StartupMessage.Option, Object> options = new EnumMap<StartupMessage.Option, Object>(StartupMessage.Option.class);
+        Map<String, String> options = new HashMap<String, String>();
+        options.put(StartupMessage.CQL_VERSION, "3.0.0");
         if (useCompression)
         {
-            options.put(StartupMessage.Option.COMPRESSION, "snappy");
+            options.put(StartupMessage.COMPRESSION, "snappy");
             connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
         }
-        execute(new StartupMessage("3.0.0", options));
+        execute(new StartupMessage(options));
     }
 
     protected void establishConnection() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 1a028de..cecead2 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -17,6 +17,11 @@
  */
 package org.apache.cassandra.transport.messages;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 
@@ -54,11 +59,18 @@ public class OptionsMessage extends Message.Request
 
     public Message.Response execute()
     {
-        SupportedMessage supported = new SupportedMessage();
-        supported.cqlVersions.add(QueryProcessor.CQL_VERSION.toString());
+        List<String> cqlVersions = new ArrayList<String>();
+        cqlVersions.add(QueryProcessor.CQL_VERSION.toString());
+
+        List<String> compressions = new ArrayList<String>();
         if (FrameCompressor.SnappyCompressor.instance != null)
-            supported.compressions.add("snappy");
-        return supported;
+            compressions.add("snappy");
+
+        Map<String, List<String>> supported = new HashMap<String, List<String>>();
+        supported.put(StartupMessage.CQL_VERSION, cqlVersions);
+        supported.put(StartupMessage.COMPRESSION, compressions);
+
+        return new SupportedMessage(supported);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index fc28b69..23199f1 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -34,83 +34,29 @@ import org.apache.cassandra.utils.SemanticVersion;
  */
 public class StartupMessage extends Message.Request
 {
-    public enum Option implements OptionCodec.Codecable<Option>
-    {
-        COMPRESSION(1);
-
-        private final int id;
-
-        private Option(int id)
-        {
-            this.id = id;
-        }
-
-        public int getId()
-        {
-            return id;
-        }
-
-        public Object readValue(ChannelBuffer cb)
-        {
-            switch (this)
-            {
-                case COMPRESSION:
-                    return CBUtil.readString(cb);
-                default:
-                    throw new AssertionError();
-            }
-        }
-
-        public void writeValue(Object value, ChannelBuffer cb)
-        {
-            switch (this)
-            {
-                case COMPRESSION:
-                    assert value instanceof String;
-                    cb.writeBytes(CBUtil.stringToCB((String)value));
-                    break;
-            }
-        }
-
-        public int serializedValueSize(Object value)
-        {
-            switch (this)
-            {
-                case COMPRESSION:
-                    return 2 + ((String)value).getBytes(Charsets.UTF_8).length;
-                default:
-                    throw new AssertionError();
-            }
-        }
-    }
-
-    private static OptionCodec<Option> optionCodec = new OptionCodec<Option>(Option.class);
+    public static final String CQL_VERSION = "CQL_VERSION";
+    public static final String COMPRESSION = "COMPRESSION";
 
     public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>()
     {
         public StartupMessage decode(ChannelBuffer body)
         {
-            String verString = CBUtil.readString(body);
-
-            Map<Option, Object> options = optionCodec.decode(body);
-            return new StartupMessage(verString, options);
+            return new StartupMessage(CBUtil.readStringMap(body));
         }
 
         public ChannelBuffer encode(StartupMessage msg)
         {
-            ChannelBuffer vcb = CBUtil.stringToCB(msg.cqlVersion);
-            ChannelBuffer ocb = optionCodec.encode(msg.options);
-            return ChannelBuffers.wrappedBuffer(vcb, ocb);
+            ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
+            CBUtil.writeStringMap(cb, msg.options);
+            return cb;
         }
     };
 
-    public final String cqlVersion;
-    public final Map<Option, Object> options;
+    public final Map<String, String> options;
 
-    public StartupMessage(String cqlVersion, Map<Option, Object> options)
+    public StartupMessage(Map<String, String> options)
     {
         super(Message.Type.STARTUP);
-        this.cqlVersion = cqlVersion;
         this.options = options;
     }
 
@@ -123,13 +69,17 @@ public class StartupMessage extends Message.Request
     {
         try
         {
+            String cqlVersion = options.get(CQL_VERSION);
+            if (cqlVersion == null)
+                throw new ProtocolException("Missing value CQL_VERSION in STARTUP message");
+
             connection.clientState().setCQLVersion(cqlVersion);
             if (connection.clientState().getCQLVersion().compareTo(new SemanticVersion("2.99.0")) < 0)
                 throw new ProtocolException(String.format("CQL version %s is not support by the binary protocol (supported version are >= 3.0.0)", cqlVersion));
 
-            if (options.containsKey(Option.COMPRESSION))
+            if (options.containsKey(COMPRESSION))
             {
-                String compression = ((String)options.get(Option.COMPRESSION)).toLowerCase();
+                String compression = options.get(COMPRESSION).toLowerCase();
                 if (compression.equals("snappy"))
                 {
                     if (FrameCompressor.SnappyCompressor.instance == null)
@@ -156,6 +106,6 @@ public class StartupMessage extends Message.Request
     @Override
     public String toString()
     {
-        return "STARTUP cqlVersion=" + cqlVersion;
+        return "STARTUP " + options;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
index 0b1a6b5..fe0fa77 100644
--- a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.transport.messages;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -35,33 +36,23 @@ public class SupportedMessage extends Message.Response
     {
         public SupportedMessage decode(ChannelBuffer body)
         {
-            List<String> versions = CBUtil.readStringList(body);
-            List<String> compressions = CBUtil.readStringList(body);
-            return new SupportedMessage(versions, compressions);
+            return new SupportedMessage(CBUtil.readStringToStringListMap(body));
         }
 
         public ChannelBuffer encode(SupportedMessage msg)
         {
             ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
-            CBUtil.writeStringList(cb, msg.cqlVersions);
-            CBUtil.writeStringList(cb, msg.compressions);
+            CBUtil.writeStringToStringListMap(cb, msg.supported);
             return cb;
         }
     };
 
-    public final List<String> cqlVersions;
-    public final List<String> compressions;
+    public final Map<String, List<String>> supported;
 
-    public SupportedMessage()
-    {
-        this(new ArrayList<String>(), new ArrayList<String>());
-    }
-
-    private SupportedMessage(List<String> cqlVersions, List<String> compressions)
+    public SupportedMessage(Map<String, List<String>> supported)
     {
         super(Message.Type.SUPPORTED);
-        this.cqlVersions = cqlVersions;
-        this.compressions = compressions;
+        this.supported = supported;
     }
 
     public ChannelBuffer encode()
@@ -72,6 +63,6 @@ public class SupportedMessage extends Message.Response
     @Override
     public String toString()
     {
-        return "SUPPORTED versions=" + cqlVersions + " compressions=" + compressions;
+        return "SUPPORTED " + supported;
     }
 }