You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/05/27 09:48:10 UTC

git commit: Binary protocol: adds message to batch (prepared or not) statements

Updated Branches:
  refs/heads/trunk 314c8e85d -> 6d04ef038


Binary protocol: adds message to batch (prepared or not) statements

patch by slebresne; reviewed by iamaleksey for CASSANDRA-4693


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d04ef03
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d04ef03
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d04ef03

Branch: refs/heads/trunk
Commit: 6d04ef0383eb09716377f649b4c6f903624a31ac
Parents: 314c8e8
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu May 16 16:37:42 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon May 27 09:47:14 2013 +0200

----------------------------------------------------------------------
 doc/native_protocol_v2.spec                        |   42 ++-
 .../org/apache/cassandra/cql3/QueryProcessor.java  |   17 +-
 .../cassandra/cql3/statements/BatchStatement.java  |   68 +++-
 .../org/apache/cassandra/transport/CBUtil.java     |    7 +
 .../org/apache/cassandra/transport/Message.java    |    3 +-
 .../cassandra/transport/messages/BatchMessage.java |  255 +++++++++++++++
 .../cassandra/transport/messages/QueryMessage.java |    7 +-
 7 files changed, 374 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/doc/native_protocol_v2.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v2.spec b/doc/native_protocol_v2.spec
index a765700..20c0a80 100644
--- a/doc/native_protocol_v2.spec
+++ b/doc/native_protocol_v2.spec
@@ -20,7 +20,8 @@ Table of Contents
       4.1.4. QUERY
       4.1.5. PREPARE
       4.1.6. EXECUTE
-      4.1.7. REGISTER
+      4.1.7. BATCH
+      4.1.8. REGISTER
     4.2. Responses
       4.2.1. ERROR
       4.2.2. READY
@@ -159,6 +160,7 @@ Table of Contents
     0x0A    EXECUTE
     0x0B    REGISTER
     0x0C    EVENT
+    0x0D    BATCH
 
   Messages are described in Section 4.
 
@@ -304,8 +306,36 @@ Table of Contents
 
   The response from the server will be a RESULT message.
 
+4.1.7. BATCH
 
-4.1.7. REGISTER
+  Allows executing a list of queries (prepared or not) as a batch (note that
+  only DML statements are accepted in a batch). The body of the message must
+  be:
+    <type><n><query_1>...<query_n><consistency>
+  where:
+    - <type> is a [byte] indicating the type of batch to use:
+        - If <type> == 0, the batch will be "logged". This is equivalent to a
+          normal CQL3 batch statement.
+        - If <type> == 1, the batch will be "unlogged".
+        - If <type> == 2, the batch will be a "counter" batch (and non-counter
+          statements will be rejected).
+    - <n> is a [short] indicating the number of following queries.
+    - <query_1>...<query_n> are the queries to execute. A <query_i> must be of the
+      form:
+        <kind><string_or_id><n><value_1>...<value_n>
+      where:
+       - <kind> is a [byte] indicating whether the following query is a prepared
+         one or not. <kind> value must be either 0 or 1.
+       - <string_or_id> depends on the value of <kind>. If <kind> == 0, it should be
+         a [long string] query string (as in QUERY, the query string might contain
+         bind markers). Otherwise (that is, if <kind> == 1), it should be a
+         [short bytes] representing a prepared query ID.
+       - <n> is a [short] indicating the number (possibly 0) of following values.
+       - <value_1>...<value_n> are the [bytes] to use for bound variables.
+    - <consistency> is the [consistency] level for the operation.
+
+
+4.1.8. REGISTER
 
   Register this connection to receive some type of events. The body of the
   message is a [string list] representing the event types to register to. See
@@ -645,8 +675,10 @@ Table of Contents
               bytes] representing the unknown ID.
 
 8. Changes from v1
-  * Protocol is versioned to allow old client connects to a newer server, if a newer
-    client connects to an older server, it needs to check if it gets a
+  * Protocol is versioned to allow old client connects to a newer server, if a
+    newer client connects to an older server, it needs to check if it gets a
     ProtocolException on connection and try connecting with a lower version.
   * A query can now have bind variables even though the statement is not
-    prepared. (see Section 4.1.4)
\ No newline at end of file
+    prepared; see Section 4.1.4.
+  * A new BATCH message allows to batch a set of queries (prepared or not); see 
+    Section 4.1.7.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index f7aebff..2edbb96 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -125,8 +125,8 @@ public class QueryProcessor
     throws RequestExecutionException, RequestValidationException
     {
         ClientState clientState = queryState.getClientState();
-        statement.validate(clientState);
         statement.checkAccess(clientState);
+        statement.validate(clientState);
         ResultMessage result = statement.execute(cl, queryState, variables);
         return result == null ? new ResultMessage.Void() : result;
     }
@@ -147,6 +147,11 @@ public class QueryProcessor
         return processStatement(prepared, cl, queryState, variables);
     }
 
+    public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
+    {
+        return getStatement(queryStr, queryState.getClientState()).statement;
+    }
+
     public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException
     {
         try
@@ -261,6 +266,16 @@ public class QueryProcessor
         return processStatement(statement, cl, queryState, variables);
     }
 
+    public static ResultMessage processBatch(BatchStatement batch, ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables)
+    throws RequestExecutionException, RequestValidationException
+    {
+        ClientState clientState = queryState.getClientState();
+        batch.checkAccess(clientState);
+        batch.validate(clientState);
+        batch.executeWithPerStatementVariables(cl, queryState, variables);
+        return new ResultMessage.Void();
+    }
+
     private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
     throws RequestValidationException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index c7cd9ae..d6d0e16 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -98,25 +98,47 @@ public class BatchStatement implements CQLStatement
     {
         Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
         for (ModificationStatement statement : statements)
+            addStatementMutations(statement, variables, local, cl, now, mutations);
+
+        return mutations.values();
+    }
+
+    private Collection<? extends IMutation> getMutations(List<List<ByteBuffer>> variables, ConsistencyLevel cl, long now)
+    throws RequestExecutionException, RequestValidationException
+    {
+        Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
+        for (int i = 0; i < statements.size(); i++)
         {
-            // Group mutation together, otherwise they won't get applied atomically
-            for (IMutation m : statement.getMutations(variables, local, cl, getTimestamp(now), true))
+            ModificationStatement statement = statements.get(i);
+            List<ByteBuffer> statementVariables = variables.get(i);
+            addStatementMutations(statement, statementVariables, false, cl, now, mutations);
+        }
+        return mutations.values();
+    }
+
+    private void addStatementMutations(ModificationStatement statement,
+                                       List<ByteBuffer> variables,
+                                       boolean local,
+                                       ConsistencyLevel cl,
+                                       long now,
+                                       Map<Pair<String, ByteBuffer>, IMutation> mutations)
+    throws RequestExecutionException, RequestValidationException
+    {
+        // Group mutation together, otherwise they won't get applied atomically
+        for (IMutation m : statement.getMutations(variables, local, cl, getTimestamp(now), true))
+        {
+            Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key());
+            IMutation existing = mutations.get(key);
+
+            if (existing == null)
             {
-                Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key());
-                IMutation existing = mutations.get(key);
-
-                if (existing == null)
-                {
-                    mutations.put(key, m);
-                }
-                else
-                {
-                    existing.addAll(m);
-                }
+                mutations.put(key, m);
+            }
+            else
+            {
+                existing.addAll(m);
             }
         }
-
-        return mutations.values();
     }
 
     public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
@@ -124,10 +146,22 @@ public class BatchStatement implements CQLStatement
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");
 
-        Collection<? extends IMutation> mutations = getMutations(variables, false, cl, queryState.getTimestamp());
+        execute(getMutations(variables, false, cl, queryState.getTimestamp()), cl);
+        return null;
+    }
+
+    public void executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
+    {
+        if (cl == null)
+            throw new InvalidRequestException("Invalid empty consistency level");
+
+        execute(getMutations(variables, cl, queryState.getTimestamp()), cl);
+    }
+
+    private void execute(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
+    {
         boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
-        return null;
     }
 
     public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 2ad8c72..897a3d9 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -102,6 +102,13 @@ public abstract class CBUtil
         return cb;
     }
 
+    public static ChannelBuffer byteToCB(byte b)
+    {
+        ChannelBuffer cb = ChannelBuffers.buffer(1);
+        cb.writeByte(b);
+        return cb;
+    }
+
     public static ChannelBuffer intToCB(int i)
     {
         ChannelBuffer cb = ChannelBuffers.buffer(4);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 3121ce9..e57da51 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -69,7 +69,8 @@ public abstract class Message
         PREPARE      (9,  Direction.REQUEST,  PrepareMessage.codec),
         EXECUTE      (10, Direction.REQUEST,  ExecuteMessage.codec),
         REGISTER     (11, Direction.REQUEST,  RegisterMessage.codec),
-        EVENT        (12, Direction.RESPONSE, EventMessage.codec);
+        EVENT        (12, Direction.RESPONSE, EventMessage.codec),
+        BATCH        (13, Direction.REQUEST,  BatchMessage.codec);
 
         public final int opcode;
         public final Direction direction;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
new file mode 100644
index 0000000..3bec918
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.utils.MD5Digest;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class BatchMessage extends Message.Request
+{
+    public static final Message.Codec<BatchMessage> codec = new Message.Codec<BatchMessage>()
+    {
+        public BatchMessage decode(ChannelBuffer body, int version)
+        {
+            if (version == 1)
+                throw new ProtocolException("BATCH messages are not support in version 1 of the protocol");
+
+            byte type = body.readByte();
+            int n = body.readUnsignedShort();
+            List<Object> queryOrIds = new ArrayList<Object>(n);
+            List<List<ByteBuffer>> variables = new ArrayList<List<ByteBuffer>>(n);
+            for (int i = 0; i < n; i++)
+            {
+                byte kind = body.readByte();
+                if (kind == 0)
+                    queryOrIds.add(CBUtil.readLongString(body));
+                else if (kind == 1)
+                    queryOrIds.add(MD5Digest.wrap(CBUtil.readBytes(body)));
+                else
+                    throw new ProtocolException("Invalid query kind in BATCH messages. Must be 0 or 1 but got " + kind);
+
+                int count = body.readUnsignedShort();
+                List<ByteBuffer> values = count == 0 ? Collections.<ByteBuffer>emptyList() : new ArrayList<ByteBuffer>(count);
+                for (int j = 0; j < count; j++)
+                    values.add(CBUtil.readValue(body));
+                variables.add(values);
+            }
+            ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
+            return new BatchMessage(toType(type), queryOrIds, variables, consistency);
+        }
+
+        public ChannelBuffer encode(BatchMessage msg)
+        {
+            // We have:
+            //   - type
+            //   - Number of queries
+            //   - For each query:
+            //      - kind
+            //      - string or id
+            //      - value count
+            //      - values
+            //   - consistency
+            int queries = msg.queryOrIdList.size();
+            int totalValues = count(msg.values);
+
+            ChannelBuffer header = ChannelBuffers.buffer(3);
+            header.writeByte(fromType(msg.type));
+            header.writeShort(queries);
+
+            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2 + queries * 3, 0, totalValues);
+            builder.add(header);
+            for (int i = 0; i < queries; i++)
+            {
+                Object q = msg.queryOrIdList.get(i);
+                builder.add(CBUtil.byteToCB((byte)(q instanceof String ? 0 : 1)));
+                if (q instanceof String)
+                    builder.add(CBUtil.longStringToCB((String)q));
+                else
+                    builder.add(CBUtil.bytesToCB(((MD5Digest)q).bytes));
+                List<ByteBuffer> queryValues = msg.values.get(i);
+                builder.add(CBUtil.shortToCB(queryValues.size()));
+                for (ByteBuffer value : queryValues)
+                    builder.addValue(value);
+            }
+
+            builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
+            return builder.build();
+        }
+
+        private BatchStatement.Type toType(byte b)
+        {
+            if (b == 0)
+                return BatchStatement.Type.LOGGED;
+            else if (b == 1)
+                return BatchStatement.Type.UNLOGGED;
+            else if (b == 2)
+                return BatchStatement.Type.COUNTER;
+            else
+                throw new ProtocolException("Invalid BATCH message type " + b);
+        }
+
+        private byte fromType(BatchStatement.Type type)
+        {
+            switch (type)
+            {
+                case LOGGED:   return 0;
+                case UNLOGGED: return 1;
+                case COUNTER:  return 2;
+                default:
+                    throw new AssertionError();
+            }
+        }
+
+        private int count(List<List<ByteBuffer>> values)
+        {
+            int count = 0;
+            for (List<ByteBuffer> l : values)
+                count += l.size();
+            return count;
+        }
+    };
+
+    public final BatchStatement.Type type;
+    public final List<Object> queryOrIdList;
+    public final List<List<ByteBuffer>> values;
+    public final ConsistencyLevel consistency;
+
+    public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList, List<List<ByteBuffer>> values, ConsistencyLevel consistency)
+    {
+        super(Message.Type.BATCH);
+        this.type = type;
+        this.queryOrIdList = queryOrIdList;
+        this.values = values;
+        this.consistency = consistency;
+    }
+
+    public ChannelBuffer encode()
+    {
+        return codec.encode(this);
+    }
+
+    public Message.Response execute(QueryState state)
+    {
+        try
+        {
+            UUID tracingId = null;
+            if (isTracingRequested())
+            {
+                tracingId = UUIDGen.getTimeUUID();
+                state.prepareTracingSession(tracingId);
+            }
+
+            if (state.traceNextQuery())
+            {
+                state.createTracingSession();
+                // TODO we don't have [typed] access to CQL bind variables here.  CASSANDRA-4560 is open to add support.
+                Tracing.instance().begin("Execute batch of CQL3 queries", Collections.<String, String>emptyMap());
+            }
+
+            List<ModificationStatement> statements = new ArrayList<ModificationStatement>(queryOrIdList.size());
+            for (int i = 0; i < queryOrIdList.size(); i++)
+            {
+                Object query = queryOrIdList.get(i);
+                CQLStatement statement;
+                if (query instanceof String)
+                {
+                    statement = QueryProcessor.parseStatement((String)query, state);
+                }
+                else
+                {
+                    statement = QueryProcessor.getPrepared((MD5Digest)query);
+                    if (statement == null)
+                        throw new PreparedQueryNotFoundException((MD5Digest)query);
+                }
+
+                List<ByteBuffer> queryValues = values.get(i);
+                if (queryValues.size() != statement.getBoundsTerms())
+                    throw new InvalidRequestException(String.format("There were %d markers(?) in CQL but %d bound variables",
+                                                                    statement.getBoundsTerms(),
+                                                                    queryValues.size()));
+                if (!(statement instanceof ModificationStatement))
+                    throw new InvalidRequestException("Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed.");
+
+                ModificationStatement mst = (ModificationStatement)statement;
+                if (mst.isCounter())
+                {
+                    if (type != BatchStatement.Type.COUNTER)
+                        throw new InvalidRequestException("Cannot include counter statement in a non-counter batch");
+                }
+                else
+                {
+                    if (type == BatchStatement.Type.COUNTER)
+                        throw new InvalidRequestException("Cannot include non-counter statement in a counter batch");
+                }
+                statements.add(mst);
+            }
+
+            // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
+            // (and no value would be really correct, so we prefer passing a clearly wrong one).
+            BatchStatement batch = new BatchStatement(-1, type, statements, new Attributes());
+            Message.Response response = QueryProcessor.processBatch(batch, consistency, state, values);
+
+            if (tracingId != null)
+                response.setTracingId(tracingId);
+
+            return response;
+        }
+        catch (Exception e)
+        {
+            return ErrorMessage.fromException(e);
+        }
+        finally
+        {
+            Tracing.instance().stopSession();
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("BATCH of [");
+        for (int i = 0; i < queryOrIdList.size(); i++)
+        {
+            if (i > 0) sb.append(", ");
+            sb.append(queryOrIdList.get(i)).append(" with ").append(values.get(i).size()).append(" values");
+        }
+        sb.append("] at consistency ").append(consistency);
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index d8d7fa2..9735654 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -44,13 +44,18 @@ public class QueryMessage extends Message.Request
         {
             String query = CBUtil.readLongString(body);
             ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
-            List<ByteBuffer> values = new ArrayList<ByteBuffer>();
+            List<ByteBuffer> values;
             if (body.readable())
             {
                 int paramCount = body.readUnsignedShort();
+                values = paramCount == 0 ? Collections.<ByteBuffer>emptyList() : new ArrayList<ByteBuffer>(paramCount);
                 for (int i = 0; i < paramCount; i++)
                      values.add(CBUtil.readValue(body));
             }
+            else
+            {
+                values = Collections.emptyList();
+            }
             return new QueryMessage(query, values, consistency);
         }