You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/09/07 10:56:18 UTC
git commit: Simplify and generalize startup message in binary protocol
Updated Branches:
refs/heads/trunk d5fc1932e -> f1711794c
Simplify and generalize startup message in binary protocol
patch by slebresne; reviewed by thepaul for CASSANDRA-4539
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1711794
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1711794
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1711794
Branch: refs/heads/trunk
Commit: f1711794cf911c349a3b60e0104064adb8124148
Parents: d5fc193
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Sep 7 10:55:13 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Sep 7 10:55:13 2012 +0200
----------------------------------------------------------------------
doc/native_protocol.spec | 30 +++---
.../org/apache/cassandra/transport/CBUtil.java | 48 +++++++++
.../org/apache/cassandra/transport/Client.java | 7 +-
.../apache/cassandra/transport/SimpleClient.java | 7 +-
.../transport/messages/OptionsMessage.java | 20 +++-
.../transport/messages/StartupMessage.java | 80 +++------------
.../transport/messages/SupportedMessage.java | 23 ++---
7 files changed, 109 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/doc/native_protocol.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec
index d19963b..1de7a95 100644
--- a/doc/native_protocol.spec
+++ b/doc/native_protocol.spec
@@ -161,6 +161,11 @@ Table of Contents
will be described when this is used.
[option list] A [short] n, followed by n [option].
+ [string map] A [short] n, followed by n pair <k><v> where <k> and <v>
+ are [string].
+ [string multimap] A [short] n, followed by n pair <k><v> where <k> is a
+ [string] and <v> is a [string list].
+
4. Messages
@@ -176,19 +181,16 @@ Table of Contents
(in which case credentials will need to be provided using CREDENTIALS).
This must be the first message of the connection, except for OPTIONS that can
- be sent before to find out the option supported by the server. Once the
+ be sent before to find out the options supported by the server. Once the
connection has been initialized, a client should not send any more STARTUP
message.
- The body is defined as:
- <version><options>
- where:
- - <version> is a [string] representing the version of the CQL version to use.
- Currently the only version supported is 3.0.0. Note that this is different
- from the protocol version.
- - <options> is an [option list]. Valid option ids are:
- 0x0001 Compression: the value is a [string] representing the
- algorithm to use (See section 5).
+ The body is a [string map] of options. Possible options are:
+ - "CQL_VERSION": the version of CQL to use. This option is mandatory and
+ currenty, the only version supported is "3.0.0". Note that this is
+ different from the protocol version.
+ - "COMPRESSION": the compression algorithm to use for frames (See section 5).
+ This is optional, if not specified no compression will be used.
4.1.2. CREDENTIALS
@@ -284,10 +286,8 @@ Table of Contents
Indicates which startup options are supported by the server. This message
comes as a response to an OPTIONS message.
- The body of a SUPPORTED message is a [string list] indicating which CQL
- version the server support, followed by a second [string list] indicating
- which compression algorithm is supported, if any (at the time of this writing,
- only snappy compression is available if the library is in the classpath).
+ The body of a SUPPORTED message is a [string multimap]. This multimap gives
+ for each of the supported STARTUP options, the list of supported values.
4.2.5. RESULT
@@ -397,7 +397,7 @@ Table of Contents
use, which is done in the STARTUP message. As a consequence, a STARTUP message
must never be compressed. However, once the STARTUP frame has been received
by the server can be compressed (including the response to the STARTUP
- request). Frame do not have to compressed however, even if compression has
+ request). Frame do not have to be compressed however, even if compression has
been agreed upon (a server may only compress frame above a certain size at its
discretion). A frame body should be compressed if and only if the compressed
flag (see Section 2.2) is set.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 791f097..44ae64d 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -20,7 +20,9 @@ package org.apache.cassandra.transport;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -128,6 +130,52 @@ public abstract class CBUtil
cb.writeBytes(stringToCB(str));
}
+ public static Map<String, String> readStringMap(ChannelBuffer cb)
+ {
+ int length = cb.readUnsignedShort();
+ Map<String, String> m = new HashMap<String, String>(length);
+ for (int i = 0; i < length; i++)
+ {
+ String k = readString(cb).toUpperCase();
+ String v = readString(cb);
+ m.put(k, v);
+ }
+ return m;
+ }
+
+ public static void writeStringMap(ChannelBuffer cb, Map<String, String> m)
+ {
+ cb.writeShort(m.size());
+ for (Map.Entry<String, String> entry : m.entrySet())
+ {
+ cb.writeBytes(stringToCB(entry.getKey()));
+ cb.writeBytes(stringToCB(entry.getValue()));
+ }
+ }
+
+ public static Map<String, List<String>> readStringToStringListMap(ChannelBuffer cb)
+ {
+ int length = cb.readUnsignedShort();
+ Map<String, List<String>> m = new HashMap<String, List<String>>(length);
+ for (int i = 0; i < length; i++)
+ {
+ String k = readString(cb).toUpperCase();
+ List<String> v = readStringList(cb);
+ m.put(k, v);
+ }
+ return m;
+ }
+
+ public static void writeStringToStringListMap(ChannelBuffer cb, Map<String, List<String>> m)
+ {
+ cb.writeShort(m.size());
+ for (Map.Entry<String, List<String>> entry : m.entrySet())
+ {
+ cb.writeBytes(stringToCB(entry.getKey()));
+ writeStringList(cb, entry.getValue());
+ }
+ }
+
public static ByteBuffer readValue(ChannelBuffer cb)
{
int length = cb.readInt();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index c65522e..2cf815f 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -82,17 +82,18 @@ public class Client extends SimpleClient
String msgType = iter.next().toUpperCase();
if (msgType.equals("STARTUP"))
{
- EnumMap<StartupMessage.Option, Object> options = new EnumMap<StartupMessage.Option, Object>(StartupMessage.Option.class);
+ Map<String, String> options = new HashMap<String, String>();
+ options.put(StartupMessage.CQL_VERSION, "3.0.0");
while (iter.hasNext())
{
String next = iter.next();
if (next.toLowerCase().equals("snappy"))
{
- options.put(StartupMessage.Option.COMPRESSION, "snappy");
+ options.put(StartupMessage.COMPRESSION, "snappy");
connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
}
}
- return new StartupMessage("3.0.0", options);
+ return new StartupMessage(options);
}
else if (msgType.equals("QUERY"))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 9e0ea2e..66cf5ae 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -74,13 +74,14 @@ public class SimpleClient
{
establishConnection();
- EnumMap<StartupMessage.Option, Object> options = new EnumMap<StartupMessage.Option, Object>(StartupMessage.Option.class);
+ Map<String, String> options = new HashMap<String, String>();
+ options.put(StartupMessage.CQL_VERSION, "3.0.0");
if (useCompression)
{
- options.put(StartupMessage.Option.COMPRESSION, "snappy");
+ options.put(StartupMessage.COMPRESSION, "snappy");
connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
}
- execute(new StartupMessage("3.0.0", options));
+ execute(new StartupMessage(options));
}
protected void establishConnection() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 1a028de..cecead2 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -17,6 +17,11 @@
*/
package org.apache.cassandra.transport.messages;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -54,11 +59,18 @@ public class OptionsMessage extends Message.Request
public Message.Response execute()
{
- SupportedMessage supported = new SupportedMessage();
- supported.cqlVersions.add(QueryProcessor.CQL_VERSION.toString());
+ List<String> cqlVersions = new ArrayList<String>();
+ cqlVersions.add(QueryProcessor.CQL_VERSION.toString());
+
+ List<String> compressions = new ArrayList<String>();
if (FrameCompressor.SnappyCompressor.instance != null)
- supported.compressions.add("snappy");
- return supported;
+ compressions.add("snappy");
+
+ Map<String, List<String>> supported = new HashMap<String, List<String>>();
+ supported.put(StartupMessage.CQL_VERSION, cqlVersions);
+ supported.put(StartupMessage.COMPRESSION, compressions);
+
+ return new SupportedMessage(supported);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index fc28b69..23199f1 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -34,83 +34,29 @@ import org.apache.cassandra.utils.SemanticVersion;
*/
public class StartupMessage extends Message.Request
{
- public enum Option implements OptionCodec.Codecable<Option>
- {
- COMPRESSION(1);
-
- private final int id;
-
- private Option(int id)
- {
- this.id = id;
- }
-
- public int getId()
- {
- return id;
- }
-
- public Object readValue(ChannelBuffer cb)
- {
- switch (this)
- {
- case COMPRESSION:
- return CBUtil.readString(cb);
- default:
- throw new AssertionError();
- }
- }
-
- public void writeValue(Object value, ChannelBuffer cb)
- {
- switch (this)
- {
- case COMPRESSION:
- assert value instanceof String;
- cb.writeBytes(CBUtil.stringToCB((String)value));
- break;
- }
- }
-
- public int serializedValueSize(Object value)
- {
- switch (this)
- {
- case COMPRESSION:
- return 2 + ((String)value).getBytes(Charsets.UTF_8).length;
- default:
- throw new AssertionError();
- }
- }
- }
-
- private static OptionCodec<Option> optionCodec = new OptionCodec<Option>(Option.class);
+ public static final String CQL_VERSION = "CQL_VERSION";
+ public static final String COMPRESSION = "COMPRESSION";
public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>()
{
public StartupMessage decode(ChannelBuffer body)
{
- String verString = CBUtil.readString(body);
-
- Map<Option, Object> options = optionCodec.decode(body);
- return new StartupMessage(verString, options);
+ return new StartupMessage(CBUtil.readStringMap(body));
}
public ChannelBuffer encode(StartupMessage msg)
{
- ChannelBuffer vcb = CBUtil.stringToCB(msg.cqlVersion);
- ChannelBuffer ocb = optionCodec.encode(msg.options);
- return ChannelBuffers.wrappedBuffer(vcb, ocb);
+ ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
+ CBUtil.writeStringMap(cb, msg.options);
+ return cb;
}
};
- public final String cqlVersion;
- public final Map<Option, Object> options;
+ public final Map<String, String> options;
- public StartupMessage(String cqlVersion, Map<Option, Object> options)
+ public StartupMessage(Map<String, String> options)
{
super(Message.Type.STARTUP);
- this.cqlVersion = cqlVersion;
this.options = options;
}
@@ -123,13 +69,17 @@ public class StartupMessage extends Message.Request
{
try
{
+ String cqlVersion = options.get(CQL_VERSION);
+ if (cqlVersion == null)
+ throw new ProtocolException("Missing value CQL_VERSION in STARTUP message");
+
connection.clientState().setCQLVersion(cqlVersion);
if (connection.clientState().getCQLVersion().compareTo(new SemanticVersion("2.99.0")) < 0)
throw new ProtocolException(String.format("CQL version %s is not support by the binary protocol (supported version are >= 3.0.0)", cqlVersion));
- if (options.containsKey(Option.COMPRESSION))
+ if (options.containsKey(COMPRESSION))
{
- String compression = ((String)options.get(Option.COMPRESSION)).toLowerCase();
+ String compression = options.get(COMPRESSION).toLowerCase();
if (compression.equals("snappy"))
{
if (FrameCompressor.SnappyCompressor.instance == null)
@@ -156,6 +106,6 @@ public class StartupMessage extends Message.Request
@Override
public String toString()
{
- return "STARTUP cqlVersion=" + cqlVersion;
+ return "STARTUP " + options;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
index 0b1a6b5..fe0fa77 100644
--- a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.transport.messages;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -35,33 +36,23 @@ public class SupportedMessage extends Message.Response
{
public SupportedMessage decode(ChannelBuffer body)
{
- List<String> versions = CBUtil.readStringList(body);
- List<String> compressions = CBUtil.readStringList(body);
- return new SupportedMessage(versions, compressions);
+ return new SupportedMessage(CBUtil.readStringToStringListMap(body));
}
public ChannelBuffer encode(SupportedMessage msg)
{
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
- CBUtil.writeStringList(cb, msg.cqlVersions);
- CBUtil.writeStringList(cb, msg.compressions);
+ CBUtil.writeStringToStringListMap(cb, msg.supported);
return cb;
}
};
- public final List<String> cqlVersions;
- public final List<String> compressions;
+ public final Map<String, List<String>> supported;
- public SupportedMessage()
- {
- this(new ArrayList<String>(), new ArrayList<String>());
- }
-
- private SupportedMessage(List<String> cqlVersions, List<String> compressions)
+ public SupportedMessage(Map<String, List<String>> supported)
{
super(Message.Type.SUPPORTED);
- this.cqlVersions = cqlVersions;
- this.compressions = compressions;
+ this.supported = supported;
}
public ChannelBuffer encode()
@@ -72,6 +63,6 @@ public class SupportedMessage extends Message.Response
@Override
public String toString()
{
- return "SUPPORTED versions=" + cqlVersions + " compressions=" + compressions;
+ return "SUPPORTED " + supported;
}
}