You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/09/26 14:10:25 UTC
[3/3] git commit: Make prepared statement global instead of
per-connection
Make prepared statement global instead of per-connection
patch by slebresne; reviewed by jbellis for CASSANDRA-4449
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ccca5f1e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ccca5f1e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ccca5f1e
Branch: refs/heads/trunk
Commit: ccca5f1e39c220ddc7ce68883622667229e28113
Parents: f6bb970
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Sep 26 14:05:09 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Sep 26 14:05:09 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
doc/native_protocol.spec | 4 +
.../org/apache/cassandra/cql3/QueryProcessor.java | 60 +++++++++---
.../apache/cassandra/exceptions/ExceptionCode.java | 3 +-
.../exceptions/PreparedQueryNotFoundException.java | 40 ++++++++
.../org/apache/cassandra/service/ClientState.java | 12 ---
.../apache/cassandra/thrift/CassandraServer.java | 9 +-
.../org/apache/cassandra/transport/CBUtil.java | 20 ++++
.../org/apache/cassandra/transport/Client.java | 3 +-
.../apache/cassandra/transport/SimpleClient.java | 2 +-
.../cassandra/transport/messages/ErrorMessage.java | 11 ++
.../transport/messages/ExecuteMessage.java | 20 +++--
.../transport/messages/PrepareMessage.java | 2 +-
.../transport/messages/ResultMessage.java | 27 ++++--
src/java/org/apache/cassandra/utils/MD5Digest.java | 75 +++++++++++++++
15 files changed, 242 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7314f99..b70c412 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
* adjust blockFor calculation to account for pending ranges due to node
movement (CASSANDRA-833)
* Change CQL version to 3.0.0 and stop accepting 3.0.0-beta1 (CASSANDRA-4649)
+ * Make prepared statement global instead of per connection (CASSANDRA-4449)
1.2-beta1
* add atomic_batch_mutate (CASSANDRA-4542, -4635)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/doc/native_protocol.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec
index 0dd5c14..9a44697 100644
--- a/doc/native_protocol.spec
+++ b/doc/native_protocol.spec
@@ -520,3 +520,7 @@ Table of Contents
already exists. If the query was attempting to create a
keyspace, <table> will be present but will be the empty
string.
+ 0x2500 Unprepared: Can be thrown while a prepared statement tries to be
+ executed if the provide prepared statement ID is not known by
+ this host. The rest of the ERROR message body will be [bytes]
+ representing the unknown ID.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 146f775..856f6fd 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3;
import java.nio.ByteBuffer;
import java.util.*;
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import org.antlr.runtime.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,6 +35,7 @@ import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.thrift.SchemaDisagreementException;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MD5Digest;
import org.apache.cassandra.utils.SemanticVersion;
public class QueryProcessor
@@ -42,6 +44,26 @@ public class QueryProcessor
private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
+ public static final int MAX_CACHE_PREPARED = 100000; // Enough to keep buggy clients from OOM'ing us
+ private static final Map<MD5Digest, CQLStatement> preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, CQLStatement>()
+ .maximumWeightedCapacity(MAX_CACHE_PREPARED)
+ .build();
+
+ private static final Map<Integer, CQLStatement> thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, CQLStatement>()
+ .maximumWeightedCapacity(MAX_CACHE_PREPARED)
+ .build();
+
+
+ public static CQLStatement getPrepared(MD5Digest id)
+ {
+ return preparedStatements.get(id);
+ }
+
+ public static CQLStatement getPrepared(Integer id)
+ {
+ return thriftPreparedStatements.get(id);
+ }
+
public static void validateKey(ByteBuffer key) throws InvalidRequestException
{
if (key == null || key.remaining() == 0)
@@ -151,20 +173,38 @@ public class QueryProcessor
}
}
- public static ResultMessage.Prepared prepare(String queryString, ClientState clientState)
+ public static ResultMessage.Prepared prepare(String queryString, ClientState clientState, boolean forThrift)
throws RequestValidationException
{
logger.trace("CQL QUERY: {}", queryString);
ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
- int statementId = makeStatementId(queryString);
- clientState.getCQL3Prepared().put(statementId, prepared.statement);
- logger.trace(String.format("Stored prepared statement #%d with %d bind markers",
- statementId,
- prepared.statement.getBoundsTerms()));
+ ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState, prepared, forThrift);
assert prepared.statement.getBoundsTerms() == prepared.boundNames.size();
- return new ResultMessage.Prepared(statementId, prepared.boundNames);
+ return msg;
+ }
+
+ private static ResultMessage.Prepared storePreparedStatement(String queryString, ClientState clientState, ParsedStatement.Prepared prepared, boolean forThrift)
+ {
+ if (forThrift)
+ {
+ int statementId = queryString.hashCode();
+ thriftPreparedStatements.put(statementId, prepared.statement);
+ logger.trace(String.format("Stored prepared statement #%d with %d bind markers",
+ statementId,
+ prepared.statement.getBoundsTerms()));
+ return ResultMessage.Prepared.forThrift(statementId, prepared.boundNames);
+ }
+ else
+ {
+ MD5Digest statementId = MD5Digest.compute(queryString);
+ logger.trace(String.format("Stored prepared statement %s with %d bind markers",
+ statementId,
+ prepared.statement.getBoundsTerms()));
+ preparedStatements.put(statementId, prepared.statement);
+ return new ResultMessage.Prepared(statementId, prepared.boundNames);
+ }
}
public static ResultMessage processPrepared(CQLStatement statement, ClientState clientState, List<ByteBuffer> variables)
@@ -188,12 +228,6 @@ public class QueryProcessor
return processStatement(statement, clientState, variables);
}
- private static final int makeStatementId(String cql)
- {
- // use the hash of the string till something better is provided
- return cql.hashCode();
- }
-
private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)
throws RequestValidationException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
index 13fcc6a..e8dfb4e 100644
--- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
+++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
@@ -43,7 +43,8 @@ public enum ExceptionCode
UNAUTHORIZED (0x2100),
INVALID (0x2200),
CONFIG_ERROR (0x2300),
- ALREADY_EXISTS (0x2400);
+ ALREADY_EXISTS (0x2400),
+ UNPREPARED (0x2500);
public final int value;
private static final Map<Integer, ExceptionCode> valueToCode = new HashMap<Integer, ExceptionCode>(ExceptionCode.values().length);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java b/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java
new file mode 100644
index 0000000..07502c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.utils.MD5Digest;
+
+public class PreparedQueryNotFoundException extends RequestValidationException
+{
+ public final MD5Digest id;
+
+ public PreparedQueryNotFoundException(MD5Digest id)
+ {
+ super(ExceptionCode.UNPREPARED, makeMsg(id));
+ this.id = id;
+ }
+
+ private static String makeMsg(MD5Digest id)
+ {
+ return String.format("Prepared query with ID %d not found" +
+ " (either the query was not prepared on this host (maybe the host has been restarted?)" +
+ " or you have prepared more than %d queries and queries %d has been evicted from the internal cache)",
+ id, QueryProcessor.MAX_CACHE_PREPARED, id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index ef640af..6266a3b 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -65,12 +65,6 @@ public class ClientState
}
};
- private final Map<Integer, org.apache.cassandra.cql3.CQLStatement> cql3Prepared = new LinkedHashMap<Integer, org.apache.cassandra.cql3.CQLStatement>(16, 0.75f, true) {
- protected boolean removeEldestEntry(Map.Entry<Integer, org.apache.cassandra.cql3.CQLStatement> eldest) {
- return size() > MAX_CACHE_PREPARED;
- }
- };
-
private long clock;
// internalCall is used to mark ClientState as used by some internal component
@@ -96,11 +90,6 @@ public class ClientState
return prepared;
}
- public Map<Integer, org.apache.cassandra.cql3.CQLStatement> getCQL3Prepared()
- {
- return cql3Prepared;
- }
-
public String getRawKeyspace()
{
return keyspace;
@@ -191,7 +180,6 @@ public class ClientState
preparedTracingSession = null;
resourceClear();
prepared.clear();
- cql3Prepared.clear();
}
public void hasKeyspaceAccess(String keyspace, Permission perm) throws UnauthorizedException, InvalidRequestException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 73ecd29..4ab19bb 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1705,7 +1705,7 @@ public class CassandraServer implements Cassandra.Iface
if (cState.getCQLVersion().major == 2)
return QueryProcessor.prepare(queryString, cState);
else
- return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState).toThriftPreparedResult();
+ return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState, true).toThriftPreparedResult();
}
catch (RequestValidationException e)
{
@@ -1741,10 +1741,13 @@ public class CassandraServer implements Cassandra.Iface
}
else
{
- org.apache.cassandra.cql3.CQLStatement statement = cState.getCQL3Prepared().get(itemId);
+ org.apache.cassandra.cql3.CQLStatement statement = org.apache.cassandra.cql3.QueryProcessor.getPrepared(itemId);
if (statement == null)
- throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
+ throw new InvalidRequestException(String.format("Prepared query with ID %d not found" +
+ " (either the query was not prepared on this host (maybe the host has been restarted?)" +
+ " or you have prepared more than %d queries and queries %d has been evicted from the internal cache)",
+ itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED, itemId));
logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId,
statement.getBoundsTerms());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index b977f35..fe8863a 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -111,6 +111,26 @@ public abstract class CBUtil
return ChannelBuffers.wrappedBuffer(shortToCB(bytes.readableBytes()), bytes);
}
+ public static ChannelBuffer bytesToCB(byte[] bytes)
+ {
+ return ChannelBuffers.wrappedBuffer(shortToCB(bytes.length), ChannelBuffers.wrappedBuffer(bytes));
+ }
+
+ public static byte[] readBytes(ChannelBuffer cb)
+ {
+ try
+ {
+ int length = cb.readUnsignedShort();
+ byte[] bytes = new byte[length];
+ cb.readBytes(bytes);
+ return bytes;
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ throw new ProtocolException("Not enough bytes to read a byte array preceded by it's 2 bytes length");
+ }
+ }
+
public static ChannelBuffer longStringToCB(String str)
{
ChannelBuffer bytes = bytes(str);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index b9e00fa..3b4ace9 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -27,6 +27,7 @@ import com.google.common.base.Splitter;
import org.apache.cassandra.transport.messages.*;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.utils.Hex;
public class Client extends SimpleClient
{
@@ -109,7 +110,7 @@ public class Client extends SimpleClient
{
try
{
- int id = Integer.parseInt(iter.next());
+ byte[] id = Hex.hexToBytes(iter.next());
List<ByteBuffer> values = new ArrayList<ByteBuffer>();
while(iter.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index ea0a3df..8132e65 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -126,7 +126,7 @@ public class SimpleClient
return (ResultMessage.Prepared)msg;
}
- public ResultMessage executePrepared(int statementId, List<ByteBuffer> values)
+ public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values)
{
Message.Response msg = execute(new ExecuteMessage(statementId, values));
assert msg instanceof ResultMessage;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index ecb387b..8ed8e94 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.transport.ProtocolException;
import org.apache.cassandra.transport.ServerError;
import org.apache.cassandra.thrift.AuthenticationException;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.MD5Digest;
/**
* Message to indicate an error to the client.
@@ -91,6 +92,12 @@ public class ErrorMessage extends Message.Response
te = new ReadTimeoutException(cl, received, blockFor, dataPresent != 0);
}
break;
+ case UNPREPARED:
+ {
+ MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body));
+ te = new PreparedQueryNotFoundException(id);
+ }
+ break;
case SYNTAX_ERROR:
te = new SyntaxException(msg);
break;
@@ -145,6 +152,10 @@ public class ErrorMessage extends Message.Response
if (readEx != null)
acb.writeByte((byte)(readEx.dataPresent ? 1 : 0));
break;
+ case UNPREPARED:
+ PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;
+ acb = CBUtil.bytesToCB(pqnfe.id.bytes);
+ break;
case ALREADY_EXISTS:
AlreadyExistsException aee = (AlreadyExistsException)msg.error;
acb = ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(aee.ksName),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 4172862..4400d12 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -25,8 +25,9 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
import org.apache.cassandra.transport.*;
+import org.apache.cassandra.utils.MD5Digest;
public class ExecuteMessage extends Message.Request
{
@@ -34,7 +35,7 @@ public class ExecuteMessage extends Message.Request
{
public ExecuteMessage decode(ChannelBuffer body)
{
- int id = body.readInt();
+ byte[] id = CBUtil.readBytes(body);
int count = body.readUnsignedShort();
List<ByteBuffer> values = new ArrayList<ByteBuffer>(count);
@@ -53,7 +54,7 @@ public class ExecuteMessage extends Message.Request
// - options
int vs = msg.values.size();
CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, vs);
- builder.add(CBUtil.intToCB(msg.statementId));
+ builder.add(CBUtil.bytesToCB(msg.statementId.bytes));
builder.add(CBUtil.shortToCB(vs));
// Values
@@ -64,10 +65,15 @@ public class ExecuteMessage extends Message.Request
}
};
- public final int statementId;
+ public final MD5Digest statementId;
public final List<ByteBuffer> values;
- public ExecuteMessage(int statementId, List<ByteBuffer> values)
+ public ExecuteMessage(byte[] statementId, List<ByteBuffer> values)
+ {
+ this(MD5Digest.wrap(statementId), values);
+ }
+
+ public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values)
{
super(Message.Type.EXECUTE);
this.statementId = statementId;
@@ -84,10 +90,10 @@ public class ExecuteMessage extends Message.Request
try
{
ServerConnection c = (ServerConnection)connection;
- CQLStatement statement = c.clientState().getCQL3Prepared().get(statementId);
+ CQLStatement statement = QueryProcessor.getPrepared(statementId);
if (statement == null)
- throw new InvalidRequestException(String.format("Prepared query with ID %d not found", statementId));
+ throw new PreparedQueryNotFoundException(statementId);
return QueryProcessor.processPrepared(statement, c.clientState(), values);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index 5c2636a..382e834 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -55,7 +55,7 @@ public class PrepareMessage extends Message.Request
{
try
{
- return QueryProcessor.prepare(query, ((ServerConnection)connection).clientState());
+ return QueryProcessor.prepare(query, ((ServerConnection)connection).clientState(), false);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index 6b63948..d5009e9 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.thrift.CqlPreparedResult;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlResultType;
+import org.apache.cassandra.utils.MD5Digest;
public abstract class ResultMessage extends Message.Response
{
@@ -248,30 +249,40 @@ public abstract class ResultMessage extends Message.Response
{
public ResultMessage decode(ChannelBuffer body)
{
- int id = body.readInt();
- return new Prepared(id, ResultSet.Metadata.codec.decode(body));
+ MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body));
+ return new Prepared(id, -1, ResultSet.Metadata.codec.decode(body));
}
public ChannelBuffer encode(ResultMessage msg)
{
assert msg instanceof Prepared;
Prepared prepared = (Prepared)msg;
- return ChannelBuffers.wrappedBuffer(CBUtil.intToCB(prepared.statementId), ResultSet.Metadata.codec.encode(prepared.metadata));
+ assert prepared.statementId != null;
+ return ChannelBuffers.wrappedBuffer(CBUtil.bytesToCB(prepared.statementId.bytes), ResultSet.Metadata.codec.encode(prepared.metadata));
}
};
- public final int statementId;
+ public final MD5Digest statementId;
public final ResultSet.Metadata metadata;
- public Prepared(int statementId, List<ColumnSpecification> names)
+ // statement id for CQL-over-thrift compatibility. The binary protocol ignore that.
+ private final int thriftStatementId;
+
+ public Prepared(MD5Digest statementId, List<ColumnSpecification> names)
+ {
+ this(statementId, -1, new ResultSet.Metadata(names));
+ }
+
+ public static Prepared forThrift(int statementId, List<ColumnSpecification> names)
{
- this(statementId, new ResultSet.Metadata(names));
+ return new Prepared(null, statementId, new ResultSet.Metadata(names));
}
- private Prepared(int statementId, ResultSet.Metadata metadata)
+ private Prepared(MD5Digest statementId, int thriftStatementId, ResultSet.Metadata metadata)
{
super(Kind.PREPARED);
this.statementId = statementId;
+ this.thriftStatementId = thriftStatementId;
this.metadata = metadata;
}
@@ -294,7 +305,7 @@ public abstract class ResultMessage extends Message.Response
namesString.add(name.toString());
typesString.add(TypeParser.getShortName(name.type));
}
- return new CqlPreparedResult(statementId, metadata.names.size()).setVariable_types(typesString).setVariable_names(namesString);
+ return new CqlPreparedResult(thriftStatementId, metadata.names.size()).setVariable_types(typesString).setVariable_names(namesString);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/utils/MD5Digest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MD5Digest.java b/src/java/org/apache/cassandra/utils/MD5Digest.java
new file mode 100644
index 0000000..59c1aba
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/MD5Digest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.Arrays;
+
+/**
+ * The result of the computation of an MD5 digest.
+ *
+ * A MD5 is really just a byte[] but arrays are a no go as map keys. We could
+ * wrap it in a ByteBuffer but:
+ * 1. MD5Digest is a more explicit name than ByteBuffer to represent a md5.
+ * 2. Using our own class allows to use our FastByteComparison for equals.
+ */
+public class MD5Digest
+{
+ public final byte[] bytes;
+
+ private MD5Digest(byte[] bytes)
+ {
+ this.bytes = bytes;
+ }
+
+ public static MD5Digest wrap(byte[] digest)
+ {
+ return new MD5Digest(digest);
+ }
+
+ public static MD5Digest compute(byte[] toHash)
+ {
+ return new MD5Digest(FBUtilities.threadLocalMD5Digest().digest(toHash));
+ }
+
+ public static MD5Digest compute(String toHash)
+ {
+ return compute(toHash.getBytes());
+ }
+
+ @Override
+ public final int hashCode()
+ {
+ return Arrays.hashCode(bytes);
+ }
+
+ @Override
+ public final boolean equals(Object o)
+ {
+ if(!(o instanceof MD5Digest))
+ return false;
+ MD5Digest that = (MD5Digest)o;
+ // handles nulls properly
+ return FBUtilities.compareUnsigned(this.bytes, that.bytes, 0, 0, this.bytes.length, that.bytes.length) == 0;
+ }
+
+ @Override
+ public String toString()
+ {
+ return Hex.bytesToHex(bytes);
+ }
+}