You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/07/05 18:22:39 UTC
[4/6] Custom CQL protocol
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
new file mode 100644
index 0000000..fa92619
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.transport.Message;
+
+/**
+ * Message to indicate that the server is ready to receive requests.
+ */
+public class ReadyMessage extends Message.Response
+{
+ public static final Message.Codec<ReadyMessage> codec = new Message.Codec<ReadyMessage>()
+ {
+ public ReadyMessage decode(ChannelBuffer body)
+ {
+ return new ReadyMessage();
+ }
+
+ public ChannelBuffer encode(ReadyMessage msg)
+ {
+ return ChannelBuffers.EMPTY_BUFFER;
+ }
+ };
+
+ public ReadyMessage()
+ {
+ super(Message.Type.READY);
+ }
+
+ public ChannelBuffer encode()
+ {
+ return codec.encode(this);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "READY";
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
new file mode 100644
index 0000000..35f8207
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.thrift.CqlPreparedResult;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlResultType;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public abstract class ResultMessage extends Message.Response
+{
+ public static final Message.Codec<ResultMessage> codec = new Message.Codec<ResultMessage>()
+ {
+ public ResultMessage decode(ChannelBuffer body)
+ {
+ Kind kind = Kind.fromId(body.readInt());
+ return kind.subcodec.decode(body);
+ }
+
+ public ChannelBuffer encode(ResultMessage msg)
+ {
+ ChannelBuffer kcb = ChannelBuffers.buffer(4);
+ kcb.writeInt(msg.kind.id);
+
+ ChannelBuffer body = msg.encodeBody();
+ return ChannelBuffers.wrappedBuffer(kcb, body);
+ }
+ };
+
+ private enum Kind
+ {
+ VOID (1, Void.subcodec),
+ ROWS (2, Rows.subcodec),
+ SET_KEYSPACE (3, SetKeyspace.subcodec),
+ PREPARED (4, Prepared.subcodec);
+
+ public final int id;
+ public final Message.Codec<ResultMessage> subcodec;
+
+ private static final Kind[] ids;
+ static
+ {
+ int maxId = -1;
+ for (Kind k : Kind.values())
+ maxId = Math.max(maxId, k.id);
+ ids = new Kind[maxId + 1];
+ for (Kind k : Kind.values())
+ {
+ if (ids[k.id] != null)
+ throw new IllegalStateException("Duplicate kind id");
+ ids[k.id] = k;
+ }
+ }
+
+ private Kind(int id, Message.Codec<ResultMessage> subcodec)
+ {
+ this.id = id;
+ this.subcodec = subcodec;
+ }
+
+ public static Kind fromId(int id)
+ {
+ Kind k = ids[id];
+ if (k == null)
+ throw new ProtocolException(String.format("Unknown kind id %d in RESULT message", id));
+ return k;
+ }
+ }
+
+ private final Kind kind;
+
+ protected ResultMessage(Kind kind)
+ {
+ super(Message.Type.RESULT);
+ this.kind = kind;
+ }
+
+ public ChannelBuffer encode()
+ {
+ return codec.encode(this);
+ }
+
+ protected abstract ChannelBuffer encodeBody();
+
+ public abstract CqlResult toThriftResult();
+
+ public static class Void extends ResultMessage
+ {
+ // use VOID_MESSAGE
+ private Void()
+ {
+ super(Kind.VOID);
+ }
+
+ public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>()
+ {
+ public ResultMessage decode(ChannelBuffer body)
+ {
+ return Void.instance();
+ }
+
+ public ChannelBuffer encode(ResultMessage msg)
+ {
+ assert msg instanceof Void;
+ return ChannelBuffers.EMPTY_BUFFER;
+ }
+ };
+
+ protected ChannelBuffer encodeBody()
+ {
+ return subcodec.encode(this);
+ }
+
+ public CqlResult toThriftResult()
+ {
+ return new CqlResult(CqlResultType.VOID);
+ }
+
+ public static Void instance()
+ {
+ return Holder.instance;
+ }
+
+ // Battling java initialization
+ private static class Holder
+ {
+ static final Void instance = new Void();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "EMPTY RESULT";
+ }
+ }
+
+ public static class SetKeyspace extends ResultMessage
+ {
+ private final String keyspace;
+
+ public SetKeyspace(String keyspace)
+ {
+ super(Kind.SET_KEYSPACE);
+ this.keyspace = keyspace;
+ }
+
+ public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>()
+ {
+ public ResultMessage decode(ChannelBuffer body)
+ {
+ String keyspace = CBUtil.readString(body);
+ return new SetKeyspace(keyspace);
+ }
+
+ public ChannelBuffer encode(ResultMessage msg)
+ {
+ assert msg instanceof SetKeyspace;
+ return CBUtil.stringToCB(((SetKeyspace)msg).keyspace);
+ }
+ };
+
+ protected ChannelBuffer encodeBody()
+ {
+ return subcodec.encode(this);
+ }
+
+ public CqlResult toThriftResult()
+ {
+ return new CqlResult(CqlResultType.VOID);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RESULT set keyspace " + keyspace;
+ }
+ }
+
+ public static class Rows extends ResultMessage
+ {
+ public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>()
+ {
+ public ResultMessage decode(ChannelBuffer body)
+ {
+ return new Rows(ResultSet.codec.decode(body));
+ }
+
+ public ChannelBuffer encode(ResultMessage msg)
+ {
+ assert msg instanceof Rows;
+ Rows rowMsg = (Rows)msg;
+ return ResultSet.codec.encode(rowMsg.result);
+ }
+ };
+
+ public final ResultSet result;
+
+ public Rows(ResultSet result)
+ {
+ super(Kind.ROWS);
+ this.result = result;
+ }
+
+ protected ChannelBuffer encodeBody()
+ {
+ return subcodec.encode(this);
+ }
+
+ public CqlResult toThriftResult()
+ {
+ return result.toThriftResult();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ROWS " + result;
+ }
+
+ }
+
+ public static class Prepared extends ResultMessage
+ {
+ public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>()
+ {
+ public ResultMessage decode(ChannelBuffer body)
+ {
+ int id = body.readInt();
+ return new Prepared(id, ResultSet.Metadata.codec.decode(body));
+ }
+
+ public ChannelBuffer encode(ResultMessage msg)
+ {
+ assert msg instanceof Prepared;
+ Prepared prepared = (Prepared)msg;
+ return ChannelBuffers.wrappedBuffer(CBUtil.intToCB(prepared.statementId), ResultSet.Metadata.codec.encode(prepared.metadata));
+ }
+ };
+
+ public final int statementId;
+ public final ResultSet.Metadata metadata;
+
+ public Prepared(int statementId, List<ColumnSpecification> names)
+ {
+ this(statementId, new ResultSet.Metadata(names));
+ }
+
+ private Prepared(int statementId, ResultSet.Metadata metadata)
+ {
+ super(Kind.PREPARED);
+ this.statementId = statementId;
+ this.metadata = metadata;
+ }
+
+ protected ChannelBuffer encodeBody()
+ {
+ return subcodec.encode(this);
+ }
+
+ public CqlResult toThriftResult()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public CqlPreparedResult toThriftPreparedResult()
+ {
+ List<String> namesString = new ArrayList<String>(metadata.names.size());
+ List<String> typesString = new ArrayList<String>(metadata.names.size());
+ for (ColumnSpecification name : metadata.names)
+ {
+ namesString.add(name.toString());
+ typesString.add(TypeParser.getShortName(name.type));
+ }
+ return new CqlPreparedResult(statementId, metadata.names.size()).setVariable_types(typesString).setVariable_names(namesString);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RESULT PREPARED " + statementId + " " + metadata;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
new file mode 100644
index 0000000..0e33da5
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import com.google.common.base.Charsets;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.SemanticVersion;
+
+/**
+ * The initial message of the protocol.
+ * Sets up a number of connection options.
+ */
+public class StartupMessage extends Message.Request
+{
+ public enum Option implements OptionCodec.Codecable<Option>
+ {
+ COMPRESSION(1);
+
+ private final int id;
+
+ private Option(int id)
+ {
+ this.id = id;
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public Object readValue(ChannelBuffer cb)
+ {
+ switch (this)
+ {
+ case COMPRESSION:
+ return CBUtil.readString(cb);
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ public void writeValue(Object value, ChannelBuffer cb)
+ {
+ switch (this)
+ {
+ case COMPRESSION:
+ assert value instanceof String;
+ cb.writeBytes(CBUtil.stringToCB((String)value));
+ break;
+ }
+ }
+
+ public int serializedValueSize(Object value)
+ {
+ switch (this)
+ {
+ case COMPRESSION:
+ return 2 + ((String)value).getBytes(Charsets.UTF_8).length;
+ default:
+ throw new AssertionError();
+ }
+ }
+ }
+
+ private static OptionCodec<Option> optionCodec = new OptionCodec<Option>(Option.class);
+
+ public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>()
+ {
+ public StartupMessage decode(ChannelBuffer body)
+ {
+ String verString = CBUtil.readString(body);
+
+ Map<Option, Object> options = optionCodec.decode(body);
+ return new StartupMessage(verString, options);
+ }
+
+ public ChannelBuffer encode(StartupMessage msg)
+ {
+ ChannelBuffer vcb = CBUtil.stringToCB(msg.cqlVersion);
+ ChannelBuffer ocb = optionCodec.encode(msg.options);
+ return ChannelBuffers.wrappedBuffer(vcb, ocb);
+ }
+ };
+
+ public final String cqlVersion;
+ public final Map<Option, Object> options;
+
+ public StartupMessage(String cqlVersion, Map<Option, Object> options)
+ {
+ super(Message.Type.STARTUP);
+ this.cqlVersion = cqlVersion;
+ this.options = options;
+ }
+
+ public ChannelBuffer encode()
+ {
+ return codec.encode(this);
+ }
+
+ public Message.Response execute()
+ {
+ try
+ {
+ connection.clientState().setCQLVersion(cqlVersion);
+ if (connection.clientState().getCQLVersion().compareTo(new SemanticVersion("2.99.0")) < 0)
+ throw new ProtocolException(String.format("CQL version %s is not support by the binary protocol (supported version are >= 3.0.0)", cqlVersion));
+
+ if (options.containsKey(Option.COMPRESSION))
+ {
+ String compression = ((String)options.get(Option.COMPRESSION)).toLowerCase();
+ if (compression.equals("snappy"))
+ {
+ if (FrameCompressor.SnappyCompressor.instance == null)
+ throw new InvalidRequestException("This instance does not support Snappy compression");
+ connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
+ }
+ else
+ {
+ throw new InvalidRequestException(String.format("Unknown compression algorithm: %s", compression));
+ }
+ }
+
+ if (connection.clientState().isLogged())
+ return new ReadyMessage();
+ else
+ return new AuthenticateMessage(DatabaseDescriptor.getAuthenticator().getClass().getName());
+ }
+ catch (InvalidRequestException e)
+ {
+ return ErrorMessage.fromException(e);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "STARTUP cqlVersion=" + cqlVersion;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
new file mode 100644
index 0000000..aaed1cf
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.utils.SemanticVersion;
+
+/**
+ * Message to indicate that the server is ready to receive requests.
+ */
+public class SupportedMessage extends Message.Response
+{
+ public static final Message.Codec<SupportedMessage> codec = new Message.Codec<SupportedMessage>()
+ {
+ public SupportedMessage decode(ChannelBuffer body)
+ {
+ List<String> versions = CBUtil.readStringList(body);
+ List<String> compressions = CBUtil.readStringList(body);
+ return new SupportedMessage(versions, compressions);
+ }
+
+ public ChannelBuffer encode(SupportedMessage msg)
+ {
+ ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
+ CBUtil.writeStringList(cb, msg.cqlVersions);
+ CBUtil.writeStringList(cb, msg.compressions);
+ return cb;
+ }
+ };
+
+ public final List<String> cqlVersions;
+ public final List<String> compressions;
+
+ public SupportedMessage()
+ {
+ this(new ArrayList<String>(), new ArrayList<String>());
+ }
+
+ private SupportedMessage(List<String> cqlVersions, List<String> compressions)
+ {
+ super(Message.Type.SUPPORTED);
+ this.cqlVersions = cqlVersions;
+ this.compressions = compressions;
+ }
+
+ public ChannelBuffer encode()
+ {
+ return codec.encode(this);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SUPPORTED versions=" + cqlVersions + " compressions=" + compressions;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/test/unit/org/apache/cassandra/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/EmbeddedServer.java b/test/unit/org/apache/cassandra/EmbeddedServer.java
index 4406103..c948cfa 100644
--- a/test/unit/org/apache/cassandra/EmbeddedServer.java
+++ b/test/unit/org/apache/cassandra/EmbeddedServer.java
@@ -55,7 +55,7 @@ public class EmbeddedServer extends SchemaLoader
{
case Thrift:
default:
- daemon = new org.apache.cassandra.thrift.CassandraDaemon();
+ daemon = new org.apache.cassandra.service.CassandraDaemon();
}
daemon.activate();
}