You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/07/05 18:22:39 UTC

[4/6] Custom CQL protocol

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
new file mode 100644
index 0000000..fa92619
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.transport.Message;
+
+/**
+ * Message to indicate that the server is ready to receive requests.
+ */
+public class ReadyMessage extends Message.Response
+{
+    public static final Message.Codec<ReadyMessage> codec = new Message.Codec<ReadyMessage>()
+    {
+        public ReadyMessage decode(ChannelBuffer body)
+        {
+            return new ReadyMessage();
+        }
+
+        public ChannelBuffer encode(ReadyMessage msg)
+        {
+            return ChannelBuffers.EMPTY_BUFFER;
+        }
+    };
+
+    public ReadyMessage()
+    {
+        super(Message.Type.READY);
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "READY";
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
new file mode 100644
index 0000000..35f8207
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.ResultSet;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.thrift.CqlPreparedResult;
+import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlResultType;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public abstract class ResultMessage extends Message.Response
+{
+    public static final Message.Codec<ResultMessage> codec = new Message.Codec<ResultMessage>()
+    {
+        public ResultMessage decode(ChannelBuffer body)
+        {
+            Kind kind = Kind.fromId(body.readInt());
+            return kind.subcodec.decode(body);
+        }
+
+        public ChannelBuffer encode(ResultMessage msg)
+        {
+            ChannelBuffer kcb = ChannelBuffers.buffer(4);
+            kcb.writeInt(msg.kind.id);
+
+            ChannelBuffer body = msg.encodeBody();
+            return ChannelBuffers.wrappedBuffer(kcb, body);
+        }
+    };
+
+    private enum Kind
+    {
+        VOID         (1, Void.subcodec),
+        ROWS         (2, Rows.subcodec),
+        SET_KEYSPACE (3, SetKeyspace.subcodec),
+        PREPARED     (4, Prepared.subcodec);
+
+        public final int id;
+        public final Message.Codec<ResultMessage> subcodec;
+
+        private static final Kind[] ids;
+        static
+        {
+            int maxId = -1;
+            for (Kind k : Kind.values())
+                maxId = Math.max(maxId, k.id);
+            ids = new Kind[maxId + 1];
+            for (Kind k : Kind.values())
+            {
+                if (ids[k.id] != null)
+                    throw new IllegalStateException("Duplicate kind id");
+                ids[k.id] = k;
+            }
+        }
+
+        private Kind(int id, Message.Codec<ResultMessage> subcodec)
+        {
+            this.id = id;
+            this.subcodec = subcodec;
+        }
+
+        public static Kind fromId(int id)
+        {
+            Kind k = ids[id];
+            if (k == null)
+                throw new ProtocolException(String.format("Unknown kind id %d in RESULT message", id));
+            return k;
+        }
+    }
+
+    private final Kind kind;
+
+    protected ResultMessage(Kind kind)
+    {
+        super(Message.Type.RESULT);
+        this.kind = kind;
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    protected abstract ChannelBuffer encodeBody();
+
+    public abstract CqlResult toThriftResult();
+
+    public static class Void extends ResultMessage
+    {
+        // use VOID_MESSAGE
+        private Void()
+        {
+            super(Kind.VOID);
+        }
+
+        public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>()
+        {
+            public ResultMessage decode(ChannelBuffer body)
+            {
+                return Void.instance();
+            }
+
+            public ChannelBuffer encode(ResultMessage msg)
+            {
+                assert msg instanceof Void;
+                return ChannelBuffers.EMPTY_BUFFER;
+            }
+        };
+
+        protected ChannelBuffer encodeBody()
+        {
+            return subcodec.encode(this);
+        }
+
+        public CqlResult toThriftResult()
+        {
+            return new CqlResult(CqlResultType.VOID);
+        }
+
+        public static Void instance()
+        {
+            return Holder.instance;
+        }
+
+        // Battling java initialization
+        private static class Holder
+        {
+            static final Void instance = new Void();
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EMPTY RESULT";
+        }
+    }
+
+    public static class SetKeyspace extends ResultMessage
+    {
+        private final String keyspace;
+
+        public SetKeyspace(String keyspace)
+        {
+            super(Kind.SET_KEYSPACE);
+            this.keyspace = keyspace;
+        }
+
+        public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>()
+        {
+            public ResultMessage decode(ChannelBuffer body)
+            {
+                String keyspace = CBUtil.readString(body);
+                return new SetKeyspace(keyspace);
+            }
+
+            public ChannelBuffer encode(ResultMessage msg)
+            {
+                assert msg instanceof SetKeyspace;
+                return CBUtil.stringToCB(((SetKeyspace)msg).keyspace);
+            }
+        };
+
+        protected ChannelBuffer encodeBody()
+        {
+            return subcodec.encode(this);
+        }
+
+        public CqlResult toThriftResult()
+        {
+            return new CqlResult(CqlResultType.VOID);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "RESULT set keyspace " + keyspace;
+        }
+    }
+
+    public static class Rows extends ResultMessage
+    {
+        public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>()
+        {
+            public ResultMessage decode(ChannelBuffer body)
+            {
+                return new Rows(ResultSet.codec.decode(body));
+            }
+
+            public ChannelBuffer encode(ResultMessage msg)
+            {
+                assert msg instanceof Rows;
+                Rows rowMsg = (Rows)msg;
+                return ResultSet.codec.encode(rowMsg.result);
+            }
+        };
+
+        public final ResultSet result;
+
+        public Rows(ResultSet result)
+        {
+            super(Kind.ROWS);
+            this.result = result;
+        }
+
+        protected ChannelBuffer encodeBody()
+        {
+            return subcodec.encode(this);
+        }
+
+        public CqlResult toThriftResult()
+        {
+            return result.toThriftResult();
+        }
+
+        @Override
+        public String toString()
+        {
+            return "ROWS " + result;
+        }
+
+    }
+
+    public static class Prepared extends ResultMessage
+    {
+        public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>()
+        {
+            public ResultMessage decode(ChannelBuffer body)
+            {
+                int id = body.readInt();
+                return new Prepared(id, ResultSet.Metadata.codec.decode(body));
+            }
+
+            public ChannelBuffer encode(ResultMessage msg)
+            {
+                assert msg instanceof Prepared;
+                Prepared prepared = (Prepared)msg;
+                return ChannelBuffers.wrappedBuffer(CBUtil.intToCB(prepared.statementId), ResultSet.Metadata.codec.encode(prepared.metadata));
+            }
+        };
+
+        public final int statementId;
+        public final ResultSet.Metadata metadata;
+
+        public Prepared(int statementId, List<ColumnSpecification> names)
+        {
+            this(statementId, new ResultSet.Metadata(names));
+        }
+
+        private Prepared(int statementId, ResultSet.Metadata metadata)
+        {
+            super(Kind.PREPARED);
+            this.statementId = statementId;
+            this.metadata = metadata;
+        }
+
+        protected ChannelBuffer encodeBody()
+        {
+            return subcodec.encode(this);
+        }
+
+        public CqlResult toThriftResult()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public CqlPreparedResult toThriftPreparedResult()
+        {
+            List<String> namesString = new ArrayList<String>(metadata.names.size());
+            List<String> typesString = new ArrayList<String>(metadata.names.size());
+            for (ColumnSpecification name : metadata.names)
+            {
+                namesString.add(name.toString());
+                typesString.add(TypeParser.getShortName(name.type));
+            }
+            return new CqlPreparedResult(statementId, metadata.names.size()).setVariable_types(typesString).setVariable_names(namesString);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "RESULT PREPARED " + statementId + " " + metadata;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
new file mode 100644
index 0000000..0e33da5
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import com.google.common.base.Charsets;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.SemanticVersion;
+
+/**
+ * The initial message of the protocol.
+ * Sets up a number of connection options.
+ */
+public class StartupMessage extends Message.Request
+{
+    public enum Option implements OptionCodec.Codecable<Option>
+    {
+        COMPRESSION(1);
+
+        private final int id;
+
+        private Option(int id)
+        {
+            this.id = id;
+        }
+
+        public int getId()
+        {
+            return id;
+        }
+
+        public Object readValue(ChannelBuffer cb)
+        {
+            switch (this)
+            {
+                case COMPRESSION:
+                    return CBUtil.readString(cb);
+                default:
+                    throw new AssertionError();
+            }
+        }
+
+        public void writeValue(Object value, ChannelBuffer cb)
+        {
+            switch (this)
+            {
+                case COMPRESSION:
+                    assert value instanceof String;
+                    cb.writeBytes(CBUtil.stringToCB((String)value));
+                    break;
+            }
+        }
+
+        public int serializedValueSize(Object value)
+        {
+            switch (this)
+            {
+                case COMPRESSION:
+                    return 2 + ((String)value).getBytes(Charsets.UTF_8).length;
+                default:
+                    throw new AssertionError();
+            }
+        }
+    }
+
+    private static OptionCodec<Option> optionCodec = new OptionCodec<Option>(Option.class);
+
+    public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>()
+    {
+        public StartupMessage decode(ChannelBuffer body)
+        {
+            String verString = CBUtil.readString(body);
+
+            Map<Option, Object> options = optionCodec.decode(body);
+            return new StartupMessage(verString, options);
+        }
+
+        public ChannelBuffer encode(StartupMessage msg)
+        {
+            ChannelBuffer vcb = CBUtil.stringToCB(msg.cqlVersion);
+            ChannelBuffer ocb = optionCodec.encode(msg.options);
+            return ChannelBuffers.wrappedBuffer(vcb, ocb);
+        }
+    };
+
+    public final String cqlVersion;
+    public final Map<Option, Object> options;
+
+    public StartupMessage(String cqlVersion, Map<Option, Object> options)
+    {
+        super(Message.Type.STARTUP);
+        this.cqlVersion = cqlVersion;
+        this.options = options;
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    public Message.Response execute()
+    {
+        try
+        {
+            connection.clientState().setCQLVersion(cqlVersion);
+            if (connection.clientState().getCQLVersion().compareTo(new SemanticVersion("2.99.0")) < 0)
+                throw new ProtocolException(String.format("CQL version %s is not support by the binary protocol (supported version are >= 3.0.0)", cqlVersion));
+
+            if (options.containsKey(Option.COMPRESSION))
+            {
+                String compression = ((String)options.get(Option.COMPRESSION)).toLowerCase();
+                if (compression.equals("snappy"))
+                {
+                    if (FrameCompressor.SnappyCompressor.instance == null)
+                        throw new InvalidRequestException("This instance does not support Snappy compression");
+                    connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
+                }
+                else
+                {
+                    throw new InvalidRequestException(String.format("Unknown compression algorithm: %s", compression));
+                }
+            }
+
+            if (connection.clientState().isLogged())
+                return new ReadyMessage();
+            else
+                return new AuthenticateMessage(DatabaseDescriptor.getAuthenticator().getClass().getName());
+        }
+        catch (InvalidRequestException e)
+        {
+            return ErrorMessage.fromException(e);
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "STARTUP cqlVersion=" + cqlVersion;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
new file mode 100644
index 0000000..aaed1cf
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.utils.SemanticVersion;
+
+/**
+ * Message to indicate that the server is ready to receive requests.
+ */
+public class SupportedMessage extends Message.Response
+{
+    public static final Message.Codec<SupportedMessage> codec = new Message.Codec<SupportedMessage>()
+    {
+        public SupportedMessage decode(ChannelBuffer body)
+        {
+            List<String> versions = CBUtil.readStringList(body);
+            List<String> compressions = CBUtil.readStringList(body);
+            return new SupportedMessage(versions, compressions);
+        }
+
+        public ChannelBuffer encode(SupportedMessage msg)
+        {
+            ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
+            CBUtil.writeStringList(cb, msg.cqlVersions);
+            CBUtil.writeStringList(cb, msg.compressions);
+            return cb;
+        }
+    };
+
+    public final List<String> cqlVersions;
+    public final List<String> compressions;
+
+    public SupportedMessage()
+    {
+        this(new ArrayList<String>(), new ArrayList<String>());
+    }
+
+    private SupportedMessage(List<String> cqlVersions, List<String> compressions)
+    {
+        super(Message.Type.SUPPORTED);
+        this.cqlVersions = cqlVersions;
+        this.compressions = compressions;
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "SUPPORTED versions=" + cqlVersions + " compressions=" + compressions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/test/unit/org/apache/cassandra/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/EmbeddedServer.java b/test/unit/org/apache/cassandra/EmbeddedServer.java
index 4406103..c948cfa 100644
--- a/test/unit/org/apache/cassandra/EmbeddedServer.java
+++ b/test/unit/org/apache/cassandra/EmbeddedServer.java
@@ -55,7 +55,7 @@ public class EmbeddedServer extends SchemaLoader
                 {
                     case Thrift:
                     default:
-                        daemon = new org.apache.cassandra.thrift.CassandraDaemon();
+                        daemon = new org.apache.cassandra.service.CassandraDaemon();
                 }
                 daemon.activate();
             }