You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/05/27 09:48:10 UTC
git commit: Binary protocol: adds message to batch (prepared or not)
statements
Updated Branches:
refs/heads/trunk 314c8e85d -> 6d04ef038
Binary protocol: adds message to batch (prepared or not) statements
patch by slebresne; reviewed by iamaleksey for CASSANDRA-4693
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d04ef03
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d04ef03
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d04ef03
Branch: refs/heads/trunk
Commit: 6d04ef0383eb09716377f649b4c6f903624a31ac
Parents: 314c8e8
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu May 16 16:37:42 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon May 27 09:47:14 2013 +0200
----------------------------------------------------------------------
doc/native_protocol_v2.spec | 42 ++-
.../org/apache/cassandra/cql3/QueryProcessor.java | 17 +-
.../cassandra/cql3/statements/BatchStatement.java | 68 +++-
.../org/apache/cassandra/transport/CBUtil.java | 7 +
.../org/apache/cassandra/transport/Message.java | 3 +-
.../cassandra/transport/messages/BatchMessage.java | 255 +++++++++++++++
.../cassandra/transport/messages/QueryMessage.java | 7 +-
7 files changed, 374 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/doc/native_protocol_v2.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v2.spec b/doc/native_protocol_v2.spec
index a765700..20c0a80 100644
--- a/doc/native_protocol_v2.spec
+++ b/doc/native_protocol_v2.spec
@@ -20,7 +20,8 @@ Table of Contents
4.1.4. QUERY
4.1.5. PREPARE
4.1.6. EXECUTE
- 4.1.7. REGISTER
+ 4.1.7. BATCH
+ 4.1.8. REGISTER
4.2. Responses
4.2.1. ERROR
4.2.2. READY
@@ -159,6 +160,7 @@ Table of Contents
0x0A EXECUTE
0x0B REGISTER
0x0C EVENT
+ 0x0D BATCH
Messages are described in Section 4.
@@ -304,8 +306,36 @@ Table of Contents
The response from the server will be a RESULT message.
+4.1.7. BATCH
-4.1.7. REGISTER
+ Allows executing a list of queries (prepared or not) as a batch (note that
+ only DML statements are accepted in a batch). The body of the message must
+ be:
+ <type><n><query_1>...<query_n><consistency>
+ where:
+ - <type> is a [byte] indicating the type of batch to use:
+ - If <type> == 0, the batch will be "logged". This is equivalent to a
+ normal CQL3 batch statement.
+ - If <type> == 1, the batch will be "unlogged".
+ - If <type> == 2, the batch will be a "counter" batch (and non-counter
+ statements will be rejected).
+ - <n> is a [short] indicating the number of following queries.
+ - <query_1>...<query_n> are the queries to execute. A <query_i> must be of the
+ form:
+ <kind><string_or_id><n><value_1>...<value_n>
+ where:
+ - <kind> is a [byte] indicating whether the following query is a prepared
+ one or not. <kind> value must be either 0 or 1.
+ - <string_or_id> depends on the value of <kind>. If <kind> == 0, it should be
+ a [long string] query string (as in QUERY, the query string might contain
+ bind markers). Otherwise (that is, if <kind> == 1), it should be a
+ [short bytes] representing a prepared query ID.
+ - <n> is a [short] indicating the number (possibly 0) of following values.
+ - <value_1>...<value_n> are the [bytes] to use for bound variables.
+ - <consistency> is the [consistency] level for the operation.
+
+
+4.1.8. REGISTER
Register this connection to receive some type of events. The body of the
message is a [string list] representing the event types to register to. See
@@ -645,8 +675,10 @@ Table of Contents
bytes] representing the unknown ID.
8. Changes from v1
- * Protocol is versioned to allow old client connects to a newer server, if a newer
- client connects to an older server, it needs to check if it gets a
+ * Protocol is versioned to allow old client connects to a newer server, if a
+ newer client connects to an older server, it needs to check if it gets a
ProtocolException on connection and try connecting with a lower version.
* A query can now have bind variables even though the statement is not
- prepared. (see Section 4.1.4)
\ No newline at end of file
+ prepared; see Section 4.1.4.
+ * A new BATCH message allows to batch a set of queries (prepared or not); see
+ Section 4.1.7.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index f7aebff..2edbb96 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -125,8 +125,8 @@ public class QueryProcessor
throws RequestExecutionException, RequestValidationException
{
ClientState clientState = queryState.getClientState();
- statement.validate(clientState);
statement.checkAccess(clientState);
+ statement.validate(clientState);
ResultMessage result = statement.execute(cl, queryState, variables);
return result == null ? new ResultMessage.Void() : result;
}
@@ -147,6 +147,11 @@ public class QueryProcessor
return processStatement(prepared, cl, queryState, variables);
}
+ public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
+ {
+ return getStatement(queryStr, queryState.getClientState()).statement;
+ }
+
public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException
{
try
@@ -261,6 +266,16 @@ public class QueryProcessor
return processStatement(statement, cl, queryState, variables);
}
+ public static ResultMessage processBatch(BatchStatement batch, ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables)
+ throws RequestExecutionException, RequestValidationException
+ {
+ ClientState clientState = queryState.getClientState();
+ batch.checkAccess(clientState);
+ batch.validate(clientState);
+ batch.executeWithPerStatementVariables(cl, queryState, variables);
+ return new ResultMessage.Void();
+ }
+
private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
throws RequestValidationException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index c7cd9ae..d6d0e16 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -98,25 +98,47 @@ public class BatchStatement implements CQLStatement
{
Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
for (ModificationStatement statement : statements)
+ addStatementMutations(statement, variables, local, cl, now, mutations);
+
+ return mutations.values();
+ }
+
+ private Collection<? extends IMutation> getMutations(List<List<ByteBuffer>> variables, ConsistencyLevel cl, long now)
+ throws RequestExecutionException, RequestValidationException
+ {
+ Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
+ for (int i = 0; i < statements.size(); i++)
{
- // Group mutation together, otherwise they won't get applied atomically
- for (IMutation m : statement.getMutations(variables, local, cl, getTimestamp(now), true))
+ ModificationStatement statement = statements.get(i);
+ List<ByteBuffer> statementVariables = variables.get(i);
+ addStatementMutations(statement, statementVariables, false, cl, now, mutations);
+ }
+ return mutations.values();
+ }
+
+ private void addStatementMutations(ModificationStatement statement,
+ List<ByteBuffer> variables,
+ boolean local,
+ ConsistencyLevel cl,
+ long now,
+ Map<Pair<String, ByteBuffer>, IMutation> mutations)
+ throws RequestExecutionException, RequestValidationException
+ {
+ // Group mutation together, otherwise they won't get applied atomically
+ for (IMutation m : statement.getMutations(variables, local, cl, getTimestamp(now), true))
+ {
+ Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key());
+ IMutation existing = mutations.get(key);
+
+ if (existing == null)
{
- Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key());
- IMutation existing = mutations.get(key);
-
- if (existing == null)
- {
- mutations.put(key, m);
- }
- else
- {
- existing.addAll(m);
- }
+ mutations.put(key, m);
+ }
+ else
+ {
+ existing.addAll(m);
}
}
-
- return mutations.values();
}
public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
@@ -124,10 +146,22 @@ public class BatchStatement implements CQLStatement
if (cl == null)
throw new InvalidRequestException("Invalid empty consistency level");
- Collection<? extends IMutation> mutations = getMutations(variables, false, cl, queryState.getTimestamp());
+ execute(getMutations(variables, false, cl, queryState.getTimestamp()), cl);
+ return null;
+ }
+
+ public void executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
+ {
+ if (cl == null)
+ throw new InvalidRequestException("Invalid empty consistency level");
+
+ execute(getMutations(variables, cl, queryState.getTimestamp()), cl);
+ }
+
+ private void execute(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
+ {
boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
- return null;
}
public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 2ad8c72..897a3d9 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -102,6 +102,13 @@ public abstract class CBUtil
return cb;
}
+ public static ChannelBuffer byteToCB(byte b)
+ {
+ ChannelBuffer cb = ChannelBuffers.buffer(1);
+ cb.writeByte(b);
+ return cb;
+ }
+
public static ChannelBuffer intToCB(int i)
{
ChannelBuffer cb = ChannelBuffers.buffer(4);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 3121ce9..e57da51 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -69,7 +69,8 @@ public abstract class Message
PREPARE (9, Direction.REQUEST, PrepareMessage.codec),
EXECUTE (10, Direction.REQUEST, ExecuteMessage.codec),
REGISTER (11, Direction.REQUEST, RegisterMessage.codec),
- EVENT (12, Direction.RESPONSE, EventMessage.codec);
+ EVENT (12, Direction.RESPONSE, EventMessage.codec),
+ BATCH (13, Direction.REQUEST, BatchMessage.codec);
public final int opcode;
public final Direction direction;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
new file mode 100644
index 0000000..3bec918
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.utils.MD5Digest;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class BatchMessage extends Message.Request
+{
+ public static final Message.Codec<BatchMessage> codec = new Message.Codec<BatchMessage>()
+ {
+ public BatchMessage decode(ChannelBuffer body, int version)
+ {
+ if (version == 1)
+ throw new ProtocolException("BATCH messages are not support in version 1 of the protocol");
+
+ byte type = body.readByte();
+ int n = body.readUnsignedShort();
+ List<Object> queryOrIds = new ArrayList<Object>(n);
+ List<List<ByteBuffer>> variables = new ArrayList<List<ByteBuffer>>(n);
+ for (int i = 0; i < n; i++)
+ {
+ byte kind = body.readByte();
+ if (kind == 0)
+ queryOrIds.add(CBUtil.readLongString(body));
+ else if (kind == 1)
+ queryOrIds.add(MD5Digest.wrap(CBUtil.readBytes(body)));
+ else
+ throw new ProtocolException("Invalid query kind in BATCH messages. Must be 0 or 1 but got " + kind);
+
+ int count = body.readUnsignedShort();
+ List<ByteBuffer> values = count == 0 ? Collections.<ByteBuffer>emptyList() : new ArrayList<ByteBuffer>(count);
+ for (int j = 0; j < count; j++)
+ values.add(CBUtil.readValue(body));
+ variables.add(values);
+ }
+ ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
+ return new BatchMessage(toType(type), queryOrIds, variables, consistency);
+ }
+
+ public ChannelBuffer encode(BatchMessage msg)
+ {
+ // We have:
+ // - type
+ // - Number of queries
+ // - For each query:
+ // - kind
+ // - string or id
+ // - value count
+ // - values
+ // - consistency
+ int queries = msg.queryOrIdList.size();
+ int totalValues = count(msg.values);
+
+ ChannelBuffer header = ChannelBuffers.buffer(3);
+ header.writeByte(fromType(msg.type));
+ header.writeShort(queries);
+
+ CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2 + queries * 3, 0, totalValues);
+ builder.add(header);
+ for (int i = 0; i < queries; i++)
+ {
+ Object q = msg.queryOrIdList.get(i);
+ builder.add(CBUtil.byteToCB((byte)(q instanceof String ? 0 : 1)));
+ if (q instanceof String)
+ builder.add(CBUtil.longStringToCB((String)q));
+ else
+ builder.add(CBUtil.bytesToCB(((MD5Digest)q).bytes));
+ List<ByteBuffer> queryValues = msg.values.get(i);
+ builder.add(CBUtil.shortToCB(queryValues.size()));
+ for (ByteBuffer value : queryValues)
+ builder.addValue(value);
+ }
+
+ builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
+ return builder.build();
+ }
+
+ private BatchStatement.Type toType(byte b)
+ {
+ if (b == 0)
+ return BatchStatement.Type.LOGGED;
+ else if (b == 1)
+ return BatchStatement.Type.UNLOGGED;
+ else if (b == 2)
+ return BatchStatement.Type.COUNTER;
+ else
+ throw new ProtocolException("Invalid BATCH message type " + b);
+ }
+
+ private byte fromType(BatchStatement.Type type)
+ {
+ switch (type)
+ {
+ case LOGGED: return 0;
+ case UNLOGGED: return 1;
+ case COUNTER: return 2;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ private int count(List<List<ByteBuffer>> values)
+ {
+ int count = 0;
+ for (List<ByteBuffer> l : values)
+ count += l.size();
+ return count;
+ }
+ };
+
+ public final BatchStatement.Type type;
+ public final List<Object> queryOrIdList;
+ public final List<List<ByteBuffer>> values;
+ public final ConsistencyLevel consistency;
+
+ public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList, List<List<ByteBuffer>> values, ConsistencyLevel consistency)
+ {
+ super(Message.Type.BATCH);
+ this.type = type;
+ this.queryOrIdList = queryOrIdList;
+ this.values = values;
+ this.consistency = consistency;
+ }
+
+ public ChannelBuffer encode()
+ {
+ return codec.encode(this);
+ }
+
+ public Message.Response execute(QueryState state)
+ {
+ try
+ {
+ UUID tracingId = null;
+ if (isTracingRequested())
+ {
+ tracingId = UUIDGen.getTimeUUID();
+ state.prepareTracingSession(tracingId);
+ }
+
+ if (state.traceNextQuery())
+ {
+ state.createTracingSession();
+ // TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support.
+ Tracing.instance().begin("Execute batch of CQL3 queries", Collections.<String, String>emptyMap());
+ }
+
+ List<ModificationStatement> statements = new ArrayList<ModificationStatement>(queryOrIdList.size());
+ for (int i = 0; i < queryOrIdList.size(); i++)
+ {
+ Object query = queryOrIdList.get(i);
+ CQLStatement statement;
+ if (query instanceof String)
+ {
+ statement = QueryProcessor.parseStatement((String)query, state);
+ }
+ else
+ {
+ statement = QueryProcessor.getPrepared((MD5Digest)query);
+ if (statement == null)
+ throw new PreparedQueryNotFoundException((MD5Digest)query);
+ }
+
+ List<ByteBuffer> queryValues = values.get(i);
+ if (queryValues.size() != statement.getBoundsTerms())
+ throw new InvalidRequestException(String.format("There were %d markers(?) in CQL but %d bound variables",
+ statement.getBoundsTerms(),
+ queryValues.size()));
+ if (!(statement instanceof ModificationStatement))
+ throw new InvalidRequestException("Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed.");
+
+ ModificationStatement mst = (ModificationStatement)statement;
+ if (mst.isCounter())
+ {
+ if (type != BatchStatement.Type.COUNTER)
+ throw new InvalidRequestException("Cannot include counter statement in a non-counter batch");
+ }
+ else
+ {
+ if (type == BatchStatement.Type.COUNTER)
+ throw new InvalidRequestException("Cannot include non-counter statement in a counter batch");
+ }
+ statements.add(mst);
+ }
+
+ // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
+ // (and no value would be really correct, so we prefer passing a clearly wrong one).
+ BatchStatement batch = new BatchStatement(-1, type, statements, new Attributes());
+ Message.Response response = QueryProcessor.processBatch(batch, consistency, state, values);
+
+ if (tracingId != null)
+ response.setTracingId(tracingId);
+
+ return response;
+ }
+ catch (Exception e)
+ {
+ return ErrorMessage.fromException(e);
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("BATCH of [");
+ for (int i = 0; i < queryOrIdList.size(); i++)
+ {
+ if (i > 0) sb.append(", ");
+ sb.append(queryOrIdList.get(i)).append(" with ").append(values.get(i).size()).append(" values");
+ }
+ sb.append("] at consistency ").append(consistency);
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index d8d7fa2..9735654 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -44,13 +44,18 @@ public class QueryMessage extends Message.Request
{
String query = CBUtil.readLongString(body);
ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
- List<ByteBuffer> values = new ArrayList<ByteBuffer>();
+ List<ByteBuffer> values;
if (body.readable())
{
int paramCount = body.readUnsignedShort();
+ values = paramCount == 0 ? Collections.<ByteBuffer>emptyList() : new ArrayList<ByteBuffer>(paramCount);
for (int i = 0; i < paramCount; i++)
values.add(CBUtil.readValue(body));
}
+ else
+ {
+ values = Collections.emptyList();
+ }
return new QueryMessage(query, values, consistency);
}