You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2017/04/12 18:58:16 UTC
cassandra git commit: Change protocol to allow sending key space
independent of query string
Repository: cassandra
Updated Branches:
refs/heads/trunk 1096f9f5e -> 1f533260a
Change protocol to allow sending key space independent of query string
patch by Sandeep Tamhankar; reviewed by Tyler Hobbs + Robert Stupp for CASSANDRA-10145
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f533260
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f533260
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f533260
Branch: refs/heads/trunk
Commit: 1f533260a01552790aff0f5f2f8f2f0aee8dbf10
Parents: 1096f9f
Author: Sandeep Tamhankar <sa...@datastax.com>
Authored: Wed Apr 12 20:56:35 2017 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Apr 12 20:56:35 2017 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
doc/native_protocol_v5.spec | 25 +++++-
.../cassandra/cql3/BatchQueryOptions.java | 5 ++
.../CustomPayloadMirroringQueryHandler.java | 5 +-
.../org/apache/cassandra/cql3/QueryHandler.java | 3 +-
.../org/apache/cassandra/cql3/QueryOptions.java | 37 +++++---
.../apache/cassandra/cql3/QueryProcessor.java | 24 ++----
.../cassandra/cql3/statements/CFStatement.java | 8 +-
.../cql3/statements/ParsedStatement.java | 2 +-
.../apache/cassandra/service/ClientState.java | 24 ++++++
.../org/apache/cassandra/transport/Client.java | 4 +-
.../cassandra/transport/SimpleClient.java | 2 +-
.../transport/messages/BatchMessage.java | 3 +-
.../transport/messages/PrepareMessage.java | 43 +++++++++-
.../org/apache/cassandra/cql3/CQLTester.java | 2 +-
.../cassandra/transport/MessagePayloadTest.java | 89 +++++++++++++++++++-
.../cassandra/transport/SerDeserTest.java | 32 +++++--
17 files changed, 249 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f3cb3b..dd33fcf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Change protocol to allow sending key space independent of query string (CASSANDRA-10145)
* Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
* Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354)
* Skip building views during base table streams on range movements (CASSANDRA-13065)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/doc/native_protocol_v5.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec
index ac3373c..320f6c0 100644
--- a/doc/native_protocol_v5.spec
+++ b/doc/native_protocol_v5.spec
@@ -332,7 +332,7 @@ Table of Contents
<query><query_parameters>
where <query> is a [long string] representing the query and
<query_parameters> must be
- <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>]
+ <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>][<keyspace>]
where:
- <consistency> is the [consistency] level for the operation.
- <flags> is a [int] whose bits define the options for this query and
@@ -375,6 +375,9 @@ Table of Contents
since the names for the expected values was returned during preparation,
a client can always provide values in the right order without any names
and using this flag, while supported, is almost surely inefficient.
+ 0x80: With keyspace. If set, <keyspace> must be present. <keyspace> is a
+ [string] indicating the keyspace that the query should be executed in.
+ It supercedes the keyspace that the connection is bound to, if any.
Note that the consistency is ignored by some queries (USE, CREATE, ALTER,
TRUNCATE, ...).
@@ -385,8 +388,17 @@ Table of Contents
4.1.5. PREPARE
- Prepare a query for later execution (through EXECUTE). The body consists of
- the CQL query to prepare as a [long string].
+ Prepare a query for later execution (through EXECUTE). The body of the message must be:
+ <query><flags>[<keyspace>]
+ where:
+ - <query> is a [long string] representing the CQL query.
+ - <flags> is a [int] whose bits define the options for this statement and in particular
+ influence what the remainder of the message contains.
+ A flag is set if the bit corresponding to its `mask` is set. Supported
+ flags are, given their mask:
+ 0x01: With keyspace. If set, <keyspace> must be present. <keyspace> is a
+ [string] indicating the keyspace that the query should be executed in.
+ It supercedes the keyspace that the connection is bound to, if any.
The server will respond with a RESULT message with a `prepared` kind (0x0004,
see Section 4.2.5).
@@ -408,7 +420,7 @@ Table of Contents
Allows executing a list of queries (prepared or not) as a batch (note that
only DML statements are accepted in a batch). The body of the message must
be:
- <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>]
+ <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>][<keyspace>]
where:
- <type> is a [byte] indicating the type of batch to use:
- If <type> == 0, the batch will be "logged". This is equivalent to a
@@ -440,6 +452,9 @@ Table of Contents
to implement. This will be fixed in a future version of the native
protocol. See https://issues.apache.org/jira/browse/CASSANDRA-10246 for
more details].
+ 0x80: With keyspace. If set, <keyspace> must be present. <keyspace> is a
+ [string] indicating the keyspace that the query should be executed in.
+ It supercedes the keyspace that the connection is bound to, if any.
- <n> is a [short] indicating the number of following queries.
- <query_1>...<query_n> are the queries to execute. A <query_i> must be of the
form:
@@ -1211,3 +1226,5 @@ Table of Contents
* Enlarged flag's bitmaps for QUERY, EXECUTE and BATCH messages from [byte] to [int]
(Sections 4.1.4, 4.1.6 and 4.1.7).
* Add the duration data type
+ * Added keyspace field in QUERY, PREPARE, and BATCH messages (Sections 4.1.4, 4.1.5, and 4.1.7).
+ * Added [int] flags field in PREPARE message (Section 4.1.5).
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
index db7fa39..3d3cda0 100644
--- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
@@ -62,6 +62,11 @@ public abstract class BatchQueryOptions
return wrapped.getConsistency();
}
+ public String getKeyspace()
+ {
+ return wrapped.getKeyspace();
+ }
+
public ConsistencyLevel getSerialConsistency()
{
return wrapped.getSerialConsistency();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
index aa8ca48..32cddba 100644
--- a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.MD5Digest;
@@ -46,9 +47,9 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler
return result;
}
- public ResultMessage.Prepared prepare(String query, QueryState state, Map<String, ByteBuffer> customPayload)
+ public ResultMessage.Prepared prepare(String query, ClientState clientState, Map<String, ByteBuffer> customPayload)
{
- ResultMessage.Prepared prepared = queryProcessor.prepare(query, state, customPayload);
+ ResultMessage.Prepared prepared = queryProcessor.prepare(query, clientState, customPayload);
prepared.setCustomPayload(customPayload);
return prepared;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/QueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java
index 0339d26..d3b41f0 100644
--- a/src/java/org/apache/cassandra/cql3/QueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.MD5Digest;
@@ -37,7 +38,7 @@ public interface QueryHandler
long queryStartNanoTime) throws RequestExecutionException, RequestValidationException;
ResultMessage.Prepared prepare(String query,
- QueryState state,
+ ClientState clientState,
Map<String, ByteBuffer> customPayload) throws RequestValidationException;
ParsedStatement.Prepared getPrepared(MD5Digest id);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index afe20d7..01df691 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -67,9 +67,9 @@ public abstract class QueryOptions
return new DefaultQueryOptions(null, null, true, null, protocolVersion);
}
- public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency, ProtocolVersion version)
+ public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency, ProtocolVersion version, String keyspace)
{
- return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L), version);
+ return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L, keyspace), version);
}
public static QueryOptions addColumnSpecifications(QueryOptions options, List<ColumnSpecification> columnSpecs)
@@ -86,11 +86,11 @@ public abstract class QueryOptions
*
* This is functionally equivalent to:
* {@code Json.parseJson(UTF8Type.instance.getSerializer().deserialize(getValues().get(bindIndex)), expectedReceivers).get(columnName)}
- * but this cache the result of parsing the JSON so that while this might be called for multiple columns on the same {@code bindIndex}
+ * but this caches the result of parsing the JSON, so that while this might be called for multiple columns on the same {@code bindIndex}
* value, the underlying JSON value is only parsed/processed once.
*
- * Note: this is a bit more involved in CQL specifics than this class generally is but we as we need to cache this per-query and in an object
- * that is available when we bind values, this is the easier place to have this.
+ * Note: this is a bit more involved in CQL specifics than this class generally is, but as we need to cache this per-query and in an object
+ * that is available when we bind values, this is the easiest place to have this.
*
* @param bindIndex the index of the bind value that should be interpreted as a JSON value.
* @param columnName the name of the column we want the value of.
@@ -136,7 +136,7 @@ public abstract class QueryOptions
*
* <p>The column specifications will be present only for prepared statements.</p>
*
- * <p>Invoke the {@link hasColumnSpecifications} method before invoking this method in order to ensure that this
+ * <p>Invoke the {@link #hasColumnSpecifications} method before invoking this method in order to ensure that this
* <code>QueryOptions</code> contains the column specifications.</p>
*
* @return the option names
@@ -172,6 +172,9 @@ public abstract class QueryOptions
return tstamp != Long.MIN_VALUE ? tstamp : state.getTimestamp();
}
+ /** The keyspace that this query is bound to, or null if not relevant. */
+ public String getKeyspace() { return getSpecificOptions().keyspace; }
+
/**
* The protocol version for the query.
*/
@@ -314,7 +317,7 @@ public abstract class QueryOptions
{
super.prepare(specs);
- orderedValues = new ArrayList<ByteBuffer>(specs.size());
+ orderedValues = new ArrayList<>(specs.size());
for (int i = 0; i < specs.size(); i++)
{
String name = specs.get(i).name.toString();
@@ -341,19 +344,21 @@ public abstract class QueryOptions
// Options that are likely to not be present in most queries
static class SpecificOptions
{
- private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, Long.MIN_VALUE);
+ private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, Long.MIN_VALUE, null);
private final int pageSize;
private final PagingState state;
private final ConsistencyLevel serialConsistency;
private final long timestamp;
+ private final String keyspace;
- private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp)
+ private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp, String keyspace)
{
this.pageSize = pageSize;
this.state = state;
this.serialConsistency = serialConsistency == null ? ConsistencyLevel.SERIAL : serialConsistency;
this.timestamp = timestamp;
+ this.keyspace = keyspace;
}
}
@@ -368,7 +373,8 @@ public abstract class QueryOptions
PAGING_STATE,
SERIAL_CONSISTENCY,
TIMESTAMP,
- NAMES_FOR_VALUES;
+ NAMES_FOR_VALUES,
+ KEYSPACE;
private static final Flag[] ALL_VALUES = values();
@@ -433,8 +439,8 @@ public abstract class QueryOptions
throw new ProtocolException(String.format("Out of bound timestamp, must be in [%d, %d] (got %d)", Long.MIN_VALUE + 1, Long.MAX_VALUE, ts));
timestamp = ts;
}
-
- options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp);
+ String keyspace = flags.contains(Flag.KEYSPACE) ? CBUtil.readString(body) : null;
+ options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp, keyspace);
}
DefaultQueryOptions opts = new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
return names == null ? opts : new OptionsWithNames(opts, names);
@@ -460,6 +466,8 @@ public abstract class QueryOptions
CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest);
if (flags.contains(Flag.TIMESTAMP))
dest.writeLong(options.getSpecificOptions().timestamp);
+ if (flags.contains(Flag.KEYSPACE))
+ CBUtil.writeString(options.getSpecificOptions().keyspace, dest);
// Note that we don't really have to bother with NAMES_FOR_VALUES server side,
// and in fact we never really encode QueryOptions, only decode them, so we
@@ -485,7 +493,8 @@ public abstract class QueryOptions
size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
if (flags.contains(Flag.TIMESTAMP))
size += 8;
-
+ if (flags.contains(Flag.KEYSPACE))
+ size += CBUtil.sizeOfString(options.getSpecificOptions().keyspace);
return size;
}
@@ -504,6 +513,8 @@ public abstract class QueryOptions
flags.add(Flag.SERIAL_CONSISTENCY);
if (options.getSpecificOptions().timestamp != Long.MIN_VALUE)
flags.add(Flag.TIMESTAMP);
+ if (options.getSpecificOptions().keyspace != null)
+ flags.add(Flag.KEYSPACE);
return flags;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 4aa2026..cca93ff 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -219,7 +219,7 @@ public class QueryProcessor implements QueryHandler
public ResultMessage process(String queryString, QueryState queryState, QueryOptions options, long queryStartNanoTime)
throws RequestExecutionException, RequestValidationException
{
- ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());
+ ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace()));
options.prepare(p.boundNames);
CQLStatement prepared = p.statement;
if (prepared.getBoundTerms() != options.getValues().size())
@@ -231,9 +231,9 @@ public class QueryProcessor implements QueryHandler
return processStatement(prepared, queryState, options, queryStartNanoTime);
}
- public static ParsedStatement.Prepared parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
+ public static ParsedStatement.Prepared parseStatement(String queryStr, ClientState clientState) throws RequestValidationException
{
- return getStatement(queryStr, queryState.getClientState());
+ return getStatement(queryStr, clientState);
}
public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException
@@ -277,7 +277,7 @@ public class QueryProcessor implements QueryHandler
return prepared;
// Note: if 2 threads prepare the same query, we'll live so don't bother synchronizing
- prepared = parseStatement(query, internalQueryState());
+ prepared = parseStatement(query, internalQueryState().getClientState());
prepared.statement.validate(internalQueryState().getClientState());
internalStatements.putIfAbsent(query, prepared);
return prepared;
@@ -334,7 +334,7 @@ public class QueryProcessor implements QueryHandler
*/
public static UntypedResultSet executeOnceInternal(String query, Object... values)
{
- ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState());
+ ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState().getClientState());
prepared.statement.validate(internalQueryState().getClientState());
ResultMessage result = prepared.statement.executeInternal(internalQueryState(), makeInternalOptions(prepared, values));
if (result instanceof ResultMessage.Rows)
@@ -374,16 +374,10 @@ public class QueryProcessor implements QueryHandler
}
public ResultMessage.Prepared prepare(String query,
- QueryState state,
+ ClientState clientState,
Map<String, ByteBuffer> customPayload) throws RequestValidationException
{
- return prepare(query, state);
- }
-
- public ResultMessage.Prepared prepare(String queryString, QueryState queryState)
- {
- ClientState cState = queryState.getClientState();
- return prepare(queryString, cState);
+ return prepare(query, clientState);
}
public static ResultMessage.Prepared prepare(String queryString, ClientState clientState)
@@ -485,7 +479,7 @@ public class QueryProcessor implements QueryHandler
public ResultMessage processBatch(BatchStatement batch, QueryState queryState, BatchQueryOptions options, long queryStartNanoTime)
throws RequestExecutionException, RequestValidationException
{
- ClientState clientState = queryState.getClientState();
+ ClientState clientState = queryState.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace());
batch.checkAccess(clientState);
batch.validate();
batch.validate(clientState);
@@ -500,7 +494,7 @@ public class QueryProcessor implements QueryHandler
// Set keyspace for statement that require login
if (statement instanceof CFStatement)
- ((CFStatement)statement).prepareKeyspace(clientState);
+ ((CFStatement) statement).prepareKeyspace(clientState);
Tracing.trace("Preparing statement");
return statement.prepare();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFStatement.java b/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
index 9b2987c..136860e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
@@ -37,14 +37,16 @@ public abstract class CFStatement extends ParsedStatement
{
if (!cfName.hasKeyspace())
{
- // XXX: We explicitely only want to call state.getKeyspace() in this case, as we don't want to throw
- // if not logged in any keyspace but a keyspace is explicitely set on the statement. So don't move
+ // XXX: We explicitly only want to call state.getKeyspace() in this case, as we don't want to throw
+ // if not logged in any keyspace but a keyspace is explicitly set on the statement. So don't move
// the call outside the 'if' or replace the method by 'prepareKeyspace(state.getKeyspace())'
cfName.setKeyspace(state.getKeyspace(), true);
}
}
- // Only for internal calls, use the version with ClientState for user queries
+ // Only for internal calls, use the version with ClientState for user queries. In particular, the
+ // version with ClientState throws an exception if the statement does not have keyspace set *and*
+ // ClientState has no keyspace.
public void prepareKeyspace(String keyspace)
{
if (!cfName.hasKeyspace())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
index 0cd549a..e617ba7 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
@@ -51,7 +51,7 @@ public abstract class ParsedStatement
/**
* Contains the CQL statement source if the statement has been "regularly" perpared via
* {@link org.apache.cassandra.cql3.QueryProcessor#prepare(java.lang.String, org.apache.cassandra.service.ClientState)} /
- * {@link QueryHandler#prepare(java.lang.String, org.apache.cassandra.service.QueryState, java.util.Map)}.
+ * {@link QueryHandler#prepare(java.lang.String, org.apache.cassandra.service.ClientState, java.util.Map)}.
* Other usages of this class may or may not contain the CQL statement source.
*/
public String rawCQLStatement;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 80cf810..dfddccd 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -139,6 +139,14 @@ public class ClientState
this.user = AuthenticatedUser.ANONYMOUS_USER;
}
+ protected ClientState(ClientState source)
+ {
+ this.isInternal = source.isInternal;
+ this.remoteAddress = source.remoteAddress;
+ this.user = source.user;
+ this.keyspace = source.keyspace;
+ }
+
/**
* @return a ClientState object for internal C* calls (not limited by any kind of auth).
*/
@@ -156,6 +164,22 @@ public class ClientState
}
/**
+ * Clone this ClientState object, but use the provided keyspace instead of the
+ * keyspace in this ClientState object.
+ *
+ * @return a new ClientState object if the keyspace argument is non-null. Otherwise do not clone
+ * and return this ClientState object.
+ */
+ public ClientState cloneWithKeyspaceIfSet(String keyspace)
+ {
+ if (keyspace == null)
+ return this;
+ ClientState clientState = new ClientState(this);
+ clientState.setKeyspace(keyspace);
+ return clientState;
+ }
+
+ /**
* This clock guarantees that updates for the same ClientState will be ordered
* in the sequence seen, even if multiple updates happen in the same millisecond.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index e428b06..9a76e03 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -136,12 +136,12 @@ public class Client extends SimpleClient
return null;
}
}
- return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null, version));
+ return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null, version, null));
}
else if (msgType.equals("PREPARE"))
{
String query = line.substring(8);
- return new PrepareMessage(query);
+ return new PrepareMessage(query, null);
}
else if (msgType.equals("EXECUTE"))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 1bb081b..13cd9bd 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -189,7 +189,7 @@ public class SimpleClient implements Closeable
public ResultMessage.Prepared prepare(String query)
{
- Message.Response msg = execute(new PrepareMessage(query));
+ Message.Response msg = execute(new PrepareMessage(query, null));
assert msg instanceof ResultMessage.Prepared;
return (ResultMessage.Prepared)msg;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index bb6411f..d9123d4 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -180,7 +180,8 @@ public class BatchMessage extends Message.Request
ParsedStatement.Prepared p;
if (query instanceof String)
{
- p = QueryProcessor.parseStatement((String)query, state);
+ p = QueryProcessor.parseStatement((String)query,
+ state.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace()));
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index b0c9dbe..bb0fc3a 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -36,26 +36,59 @@ public class PrepareMessage extends Message.Request
public PrepareMessage decode(ByteBuf body, ProtocolVersion version)
{
String query = CBUtil.readLongString(body);
- return new PrepareMessage(query);
+ String keyspace = null;
+ if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) {
+ // If flags grows, we may want to consider creating a PrepareOptions class with an internal codec
+ // class that handles flags and options of the prepare message. Since there's only one right now,
+ // we just take care of business here.
+
+ int flags = (int)body.readUnsignedInt();
+ if ((flags & 0x1) == 0x1)
+ keyspace = CBUtil.readString(body);
+ }
+ return new PrepareMessage(query, keyspace);
}
public void encode(PrepareMessage msg, ByteBuf dest, ProtocolVersion version)
{
CBUtil.writeLongString(msg.query, dest);
+ if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+ {
+ // If we have no keyspace, write out a 0-valued flag field.
+ if (msg.keyspace == null)
+ dest.writeInt(0x0);
+ else {
+ dest.writeInt(0x1);
+ CBUtil.writeString(msg.keyspace, dest);
+ }
+ }
}
public int encodedSize(PrepareMessage msg, ProtocolVersion version)
{
- return CBUtil.sizeOfLongString(msg.query);
+ int size = CBUtil.sizeOfLongString(msg.query);
+ if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+ {
+ // We always emit a flags int
+ size += 4;
+
+ // If we have a keyspace, we'd write it out. Otherwise, we'd write nothing.
+ size += msg.keyspace == null
+ ? 0
+ : CBUtil.sizeOfString(msg.keyspace);
+ }
+ return size;
}
};
private final String query;
+ private final String keyspace;
- public PrepareMessage(String query)
+ public PrepareMessage(String query, String keyspace)
{
super(Message.Type.PREPARE);
this.query = query;
+ this.keyspace = keyspace;
}
public Message.Response execute(QueryState state, long queryStartNanoTime)
@@ -75,7 +108,9 @@ public class PrepareMessage extends Message.Request
Tracing.instance.begin("Preparing CQL3 query", state.getClientAddress(), ImmutableMap.of("query", query));
}
- Message.Response response = ClientState.getCQLQueryHandler().prepare(query, state, getCustomPayload());
+ Message.Response response = ClientState.getCQLQueryHandler().prepare(query,
+ state.getClientState().cloneWithKeyspaceIfSet(keyspace),
+ getCustomPayload());
if (tracingId != null)
response.setTracingId(tracingId);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index c247d96..26437c9 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -710,7 +710,7 @@ public abstract class CQLTester
state.setKeyspace(SchemaConstants.SYSTEM_KEYSPACE_NAME);
QueryState queryState = new QueryState(state);
- ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(query, queryState);
+ ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(query, queryState.getClientState());
prepared.statement.validate(state);
QueryOptions options = QueryOptions.forInternalCalls(Collections.<ByteBuffer>emptyList());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index cfbfd39..c27593b 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.transport.messages.QueryMessage;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.MD5Digest;
+import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
public class MessagePayloadTest extends CQLTester
@@ -112,6 +113,86 @@ public class MessagePayloadTest extends CQLTester
}
@Test
+ public void testMessagePayloadBeta() throws Throwable
+ {
+ QueryHandler queryHandler = (QueryHandler) cqlQueryHandlerField.get(null);
+ cqlQueryHandlerField.set(null, new TestQueryHandler());
+ try
+ {
+ requireNetwork();
+
+ Assert.assertSame(TestQueryHandler.class, ClientState.getCQLQueryHandler().getClass());
+
+ SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+ nativePort,
+ ProtocolVersion.V5,
+ true,
+ new ClientEncryptionOptions());
+ try
+ {
+ client.connect(false);
+
+ Map<String, ByteBuffer> reqMap;
+ Map<String, ByteBuffer> respMap;
+
+ QueryOptions queryOptions = QueryOptions.create(
+ QueryOptions.DEFAULT.getConsistency(),
+ QueryOptions.DEFAULT.getValues(),
+ QueryOptions.DEFAULT.skipMetadata(),
+ QueryOptions.DEFAULT.getPageSize(),
+ QueryOptions.DEFAULT.getPagingState(),
+ QueryOptions.DEFAULT.getSerialConsistency(),
+ ProtocolVersion.V5,
+ KEYSPACE);
+ QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
+ queryOptions);
+ PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM atable", KEYSPACE);
+
+ reqMap = Collections.singletonMap("foo", bytes(42));
+ responsePayload = respMap = Collections.singletonMap("bar", bytes(42));
+ queryMessage.setCustomPayload(reqMap);
+ Message.Response queryResponse = client.execute(queryMessage);
+ payloadEquals(reqMap, requestPayload);
+ payloadEquals(respMap, queryResponse.getCustomPayload());
+
+ reqMap = Collections.singletonMap("foo", bytes(43));
+ responsePayload = respMap = Collections.singletonMap("bar", bytes(43));
+ prepareMessage.setCustomPayload(reqMap);
+ ResultMessage.Prepared prepareResponse = (ResultMessage.Prepared) client.execute(prepareMessage);
+ payloadEquals(reqMap, requestPayload);
+ payloadEquals(respMap, prepareResponse.getCustomPayload());
+
+ ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT);
+ reqMap = Collections.singletonMap("foo", bytes(44));
+ responsePayload = respMap = Collections.singletonMap("bar", bytes(44));
+ executeMessage.setCustomPayload(reqMap);
+ Message.Response executeResponse = client.execute(executeMessage);
+ payloadEquals(reqMap, requestPayload);
+ payloadEquals(respMap, executeResponse.getCustomPayload());
+
+ BatchMessage batchMessage = new BatchMessage(BatchStatement.Type.UNLOGGED,
+ Collections.<Object>singletonList("INSERT INTO atable (pk,v) VALUES (1, 'foo')"),
+ Collections.singletonList(Collections.<ByteBuffer>emptyList()),
+ queryOptions);
+ reqMap = Collections.singletonMap("foo", bytes(45));
+ responsePayload = respMap = Collections.singletonMap("bar", bytes(45));
+ batchMessage.setCustomPayload(reqMap);
+ Message.Response batchResponse = client.execute(batchMessage);
+ payloadEquals(reqMap, requestPayload);
+ payloadEquals(respMap, batchResponse.getCustomPayload());
+ }
+ finally
+ {
+ client.close();
+ }
+ }
+ finally
+ {
+ cqlQueryHandlerField.set(null, queryHandler);
+ }
+ }
+
+ @Test
public void testMessagePayload() throws Throwable
{
QueryHandler queryHandler = (QueryHandler) cqlQueryHandlerField.get(null);
@@ -134,7 +215,7 @@ public class MessagePayloadTest extends CQLTester
"CREATE TABLE " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)",
QueryOptions.DEFAULT
);
- PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable");
+ PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable", null);
reqMap = Collections.singletonMap("foo", bytes(42));
responsePayload = respMap = Collections.singletonMap("bar", bytes(42));
@@ -202,7 +283,7 @@ public class MessagePayloadTest extends CQLTester
"CREATE TABLE " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)",
QueryOptions.DEFAULT
);
- PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable");
+ PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable", null);
reqMap = Collections.singletonMap("foo", bytes(42));
responsePayload = Collections.singletonMap("bar", bytes(42));
@@ -293,13 +374,13 @@ public class MessagePayloadTest extends CQLTester
}
public ResultMessage.Prepared prepare(String query,
- QueryState state,
+ ClientState clientState,
Map<String, ByteBuffer> customPayload)
throws RequestValidationException
{
if (customPayload != null)
requestPayload = customPayload;
- ResultMessage.Prepared result = QueryProcessor.instance.prepare(query, state, customPayload);
+ ResultMessage.Prepared result = QueryProcessor.instance.prepare(query, clientState, customPayload);
if (customPayload != null)
{
result.setCustomPayload(responsePayload);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
index a1260ba..2592ae7 100644
--- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -314,15 +314,30 @@ public class SerDeserTest
private void queryOptionsSerDeserTest(ProtocolVersion version) throws Exception
{
- QueryOptions options = QueryOptions.create(ConsistencyLevel.ALL,
- Collections.singletonList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 })),
- false,
- 5000,
- Util.makeSomePagingState(version),
- ConsistencyLevel.SERIAL,
- version
- );
+ queryOptionsSerDeserTest(version, QueryOptions.create(ConsistencyLevel.ALL,
+ Collections.singletonList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 })),
+ false,
+ 5000,
+ Util.makeSomePagingState(version),
+ ConsistencyLevel.SERIAL,
+ version,
+ null
+ ));
+
+ queryOptionsSerDeserTest(version, QueryOptions.create(ConsistencyLevel.LOCAL_ONE,
+ Arrays.asList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 }),
+ ByteBuffer.wrap(new byte[] { 0x03, 0x04, 0x05, 0x03, 0x04, 0x05 })),
+ true,
+ 10,
+ Util.makeSomePagingState(version),
+ ConsistencyLevel.SERIAL,
+ version,
+ "some_keyspace"
+ ));
+ }
+ private void queryOptionsSerDeserTest(ProtocolVersion version, QueryOptions options)
+ {
ByteBuf buf = Unpooled.buffer(QueryOptions.codec.encodedSize(options, version));
QueryOptions.codec.encode(options, buf, version);
QueryOptions decodedOptions = QueryOptions.codec.decode(buf, version);
@@ -335,5 +350,6 @@ public class SerDeserTest
assertEquals(options.getValues(), decodedOptions.getValues());
assertEquals(options.getPagingState(), decodedOptions.getPagingState());
assertEquals(options.skipMetadata(), decodedOptions.skipMetadata());
+ assertEquals(options.getKeyspace(), decodedOptions.getKeyspace());
}
}