You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2017/04/12 18:58:16 UTC

cassandra git commit: Change protocol to allow sending key space independent of query string

Repository: cassandra
Updated Branches:
  refs/heads/trunk 1096f9f5e -> 1f533260a


Change protocol to allow sending key space independent of query string

patch by Sandeep Tamhankar; reviewed by Tyler Hobbs + Robert Stupp for CASSANDRA-10145


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f533260
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f533260
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f533260

Branch: refs/heads/trunk
Commit: 1f533260a01552790aff0f5f2f8f2f0aee8dbf10
Parents: 1096f9f
Author: Sandeep Tamhankar <sa...@datastax.com>
Authored: Wed Apr 12 20:56:35 2017 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Apr 12 20:56:35 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 doc/native_protocol_v5.spec                     | 25 +++++-
 .../cassandra/cql3/BatchQueryOptions.java       |  5 ++
 .../CustomPayloadMirroringQueryHandler.java     |  5 +-
 .../org/apache/cassandra/cql3/QueryHandler.java |  3 +-
 .../org/apache/cassandra/cql3/QueryOptions.java | 37 +++++---
 .../apache/cassandra/cql3/QueryProcessor.java   | 24 ++----
 .../cassandra/cql3/statements/CFStatement.java  |  8 +-
 .../cql3/statements/ParsedStatement.java        |  2 +-
 .../apache/cassandra/service/ClientState.java   | 24 ++++++
 .../org/apache/cassandra/transport/Client.java  |  4 +-
 .../cassandra/transport/SimpleClient.java       |  2 +-
 .../transport/messages/BatchMessage.java        |  3 +-
 .../transport/messages/PrepareMessage.java      | 43 +++++++++-
 .../org/apache/cassandra/cql3/CQLTester.java    |  2 +-
 .../cassandra/transport/MessagePayloadTest.java | 89 +++++++++++++++++++-
 .../cassandra/transport/SerDeserTest.java       | 32 +++++--
 17 files changed, 249 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4f3cb3b..dd33fcf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Change protocol to allow sending key space independent of query string (CASSANDRA-10145)
  * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
  * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354)
  * Skip building views during base table streams on range movements (CASSANDRA-13065)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/doc/native_protocol_v5.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec
index ac3373c..320f6c0 100644
--- a/doc/native_protocol_v5.spec
+++ b/doc/native_protocol_v5.spec
@@ -332,7 +332,7 @@ Table of Contents
     <query><query_parameters>
   where <query> is a [long string] representing the query and
   <query_parameters> must be
-    <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>]
+    <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>][<keyspace>]
   where:
     - <consistency> is the [consistency] level for the operation.
     - <flags> is a [int] whose bits define the options for this query and
@@ -375,6 +375,9 @@ Table of Contents
               since the names for the expected values was returned during preparation,
               a client can always provide values in the right order without any names
               and using this flag, while supported, is almost surely inefficient.
+        0x80: With keyspace. If set, <keyspace> must be present. <keyspace> is a
+              [string] indicating the keyspace that the query should be executed in.
+              It supercedes the keyspace that the connection is bound to, if any.
 
   Note that the consistency is ignored by some queries (USE, CREATE, ALTER,
   TRUNCATE, ...).
@@ -385,8 +388,17 @@ Table of Contents
 
 4.1.5. PREPARE
 
-  Prepare a query for later execution (through EXECUTE). The body consists of
-  the CQL query to prepare as a [long string].
+  Prepare a query for later execution (through EXECUTE). The body of the message must be:
+    <query><flags>[<keyspace>]
+  where:
+    - <query> is a [long string] representing the CQL query.
+    - <flags> is a [int] whose bits define the options for this statement and in particular
+      influence what the remainder of the message contains.
+      A flag is set if the bit corresponding to its `mask` is set. Supported
+      flags are, given their mask:
+        0x01: With keyspace. If set, <keyspace> must be present. <keyspace> is a
+              [string] indicating the keyspace that the query should be executed in.
+              It supercedes the keyspace that the connection is bound to, if any.
 
   The server will respond with a RESULT message with a `prepared` kind (0x0004,
   see Section 4.2.5).
@@ -408,7 +420,7 @@ Table of Contents
   Allows executing a list of queries (prepared or not) as a batch (note that
   only DML statements are accepted in a batch). The body of the message must
   be:
-    <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>]
+    <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>][<keyspace>]
   where:
     - <type> is a [byte] indicating the type of batch to use:
         - If <type> == 0, the batch will be "logged". This is equivalent to a
@@ -440,6 +452,9 @@ Table of Contents
               to implement. This will be fixed in a future version of the native
               protocol. See https://issues.apache.org/jira/browse/CASSANDRA-10246 for
               more details].
+        0x80: With keyspace. If set, <keyspace> must be present. <keyspace> is a
+              [string] indicating the keyspace that the query should be executed in.
+              It supercedes the keyspace that the connection is bound to, if any.
     - <n> is a [short] indicating the number of following queries.
     - <query_1>...<query_n> are the queries to execute. A <query_i> must be of the
       form:
@@ -1211,3 +1226,5 @@ Table of Contents
   * Enlarged flag's bitmaps for QUERY, EXECUTE and BATCH messages from [byte] to [int]
     (Sections 4.1.4, 4.1.6 and 4.1.7).
   * Add the duration data type
+  * Added keyspace field in QUERY, PREPARE, and BATCH messages (Sections 4.1.4, 4.1.5, and 4.1.7).
+  * Added [int] flags field in PREPARE message (Section 4.1.5).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
index db7fa39..3d3cda0 100644
--- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
@@ -62,6 +62,11 @@ public abstract class BatchQueryOptions
         return wrapped.getConsistency();
     }
 
+    public String getKeyspace()
+    {
+        return wrapped.getKeyspace();
+    }
+
     public ConsistencyLevel getSerialConsistency()
     {
         return wrapped.getSerialConsistency();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
index aa8ca48..32cddba 100644
--- a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.MD5Digest;
@@ -46,9 +47,9 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler
         return result;
     }
 
-    public ResultMessage.Prepared prepare(String query, QueryState state, Map<String, ByteBuffer> customPayload)
+    public ResultMessage.Prepared prepare(String query, ClientState clientState, Map<String, ByteBuffer> customPayload)
     {
-        ResultMessage.Prepared prepared = queryProcessor.prepare(query, state, customPayload);
+        ResultMessage.Prepared prepared = queryProcessor.prepare(query, clientState, customPayload);
         prepared.setCustomPayload(customPayload);
         return prepared;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/QueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java
index 0339d26..d3b41f0 100644
--- a/src/java/org/apache/cassandra/cql3/QueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.MD5Digest;
@@ -37,7 +38,7 @@ public interface QueryHandler
                           long queryStartNanoTime) throws RequestExecutionException, RequestValidationException;
 
     ResultMessage.Prepared prepare(String query,
-                                   QueryState state,
+                                   ClientState clientState,
                                    Map<String, ByteBuffer> customPayload) throws RequestValidationException;
 
     ParsedStatement.Prepared getPrepared(MD5Digest id);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index afe20d7..01df691 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -67,9 +67,9 @@ public abstract class QueryOptions
         return new DefaultQueryOptions(null, null, true, null, protocolVersion);
     }
 
-    public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency, ProtocolVersion version)
+    public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency, ProtocolVersion version, String keyspace)
     {
-        return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L), version);
+        return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L, keyspace), version);
     }
 
     public static QueryOptions addColumnSpecifications(QueryOptions options, List<ColumnSpecification> columnSpecs)
@@ -86,11 +86,11 @@ public abstract class QueryOptions
      *
      * This is functionally equivalent to:
      *   {@code Json.parseJson(UTF8Type.instance.getSerializer().deserialize(getValues().get(bindIndex)), expectedReceivers).get(columnName)}
-     * but this cache the result of parsing the JSON so that while this might be called for multiple columns on the same {@code bindIndex}
+     * but this caches the result of parsing the JSON, so that while this might be called for multiple columns on the same {@code bindIndex}
      * value, the underlying JSON value is only parsed/processed once.
      *
-     * Note: this is a bit more involved in CQL specifics than this class generally is but we as we need to cache this per-query and in an object
-     * that is available when we bind values, this is the easier place to have this.
+     * Note: this is a bit more involved in CQL specifics than this class generally is, but as we need to cache this per-query and in an object
+     * that is available when we bind values, this is the easiest place to have this.
      *
      * @param bindIndex the index of the bind value that should be interpreted as a JSON value.
      * @param columnName the name of the column we want the value of.
@@ -136,7 +136,7 @@ public abstract class QueryOptions
      *
      * <p>The column specifications will be present only for prepared statements.</p>
      *
-     * <p>Invoke the {@link hasColumnSpecifications} method before invoking this method in order to ensure that this
+     * <p>Invoke the {@link #hasColumnSpecifications} method before invoking this method in order to ensure that this
      * <code>QueryOptions</code> contains the column specifications.</p>
      *
      * @return the option names
@@ -172,6 +172,9 @@ public abstract class QueryOptions
         return tstamp != Long.MIN_VALUE ? tstamp : state.getTimestamp();
     }
 
+    /** The keyspace that this query is bound to, or null if not relevant. */
+    public String getKeyspace() { return getSpecificOptions().keyspace; }
+
     /**
      * The protocol version for the query.
      */
@@ -314,7 +317,7 @@ public abstract class QueryOptions
         {
             super.prepare(specs);
 
-            orderedValues = new ArrayList<ByteBuffer>(specs.size());
+            orderedValues = new ArrayList<>(specs.size());
             for (int i = 0; i < specs.size(); i++)
             {
                 String name = specs.get(i).name.toString();
@@ -341,19 +344,21 @@ public abstract class QueryOptions
     // Options that are likely to not be present in most queries
     static class SpecificOptions
     {
-        private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, Long.MIN_VALUE);
+        private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, Long.MIN_VALUE, null);
 
         private final int pageSize;
         private final PagingState state;
         private final ConsistencyLevel serialConsistency;
         private final long timestamp;
+        private final String keyspace;
 
-        private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp)
+        private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp, String keyspace)
         {
             this.pageSize = pageSize;
             this.state = state;
             this.serialConsistency = serialConsistency == null ? ConsistencyLevel.SERIAL : serialConsistency;
             this.timestamp = timestamp;
+            this.keyspace = keyspace;
         }
     }
 
@@ -368,7 +373,8 @@ public abstract class QueryOptions
             PAGING_STATE,
             SERIAL_CONSISTENCY,
             TIMESTAMP,
-            NAMES_FOR_VALUES;
+            NAMES_FOR_VALUES,
+            KEYSPACE;
 
             private static final Flag[] ALL_VALUES = values();
 
@@ -433,8 +439,8 @@ public abstract class QueryOptions
                         throw new ProtocolException(String.format("Out of bound timestamp, must be in [%d, %d] (got %d)", Long.MIN_VALUE + 1, Long.MAX_VALUE, ts));
                     timestamp = ts;
                 }
-
-                options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp);
+                String keyspace = flags.contains(Flag.KEYSPACE) ? CBUtil.readString(body) : null;
+                options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp, keyspace);
             }
             DefaultQueryOptions opts = new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
             return names == null ? opts : new OptionsWithNames(opts, names);
@@ -460,6 +466,8 @@ public abstract class QueryOptions
                 CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest);
             if (flags.contains(Flag.TIMESTAMP))
                 dest.writeLong(options.getSpecificOptions().timestamp);
+            if (flags.contains(Flag.KEYSPACE))
+                CBUtil.writeString(options.getSpecificOptions().keyspace, dest);
 
             // Note that we don't really have to bother with NAMES_FOR_VALUES server side,
             // and in fact we never really encode QueryOptions, only decode them, so we
@@ -485,7 +493,8 @@ public abstract class QueryOptions
                 size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
             if (flags.contains(Flag.TIMESTAMP))
                 size += 8;
-
+            if (flags.contains(Flag.KEYSPACE))
+                size += CBUtil.sizeOfString(options.getSpecificOptions().keyspace);
             return size;
         }
 
@@ -504,6 +513,8 @@ public abstract class QueryOptions
                 flags.add(Flag.SERIAL_CONSISTENCY);
             if (options.getSpecificOptions().timestamp != Long.MIN_VALUE)
                 flags.add(Flag.TIMESTAMP);
+            if (options.getSpecificOptions().keyspace != null)
+                flags.add(Flag.KEYSPACE);
             return flags;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 4aa2026..cca93ff 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -219,7 +219,7 @@ public class QueryProcessor implements QueryHandler
     public ResultMessage process(String queryString, QueryState queryState, QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
-        ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());
+        ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace()));
         options.prepare(p.boundNames);
         CQLStatement prepared = p.statement;
         if (prepared.getBoundTerms() != options.getValues().size())
@@ -231,9 +231,9 @@ public class QueryProcessor implements QueryHandler
         return processStatement(prepared, queryState, options, queryStartNanoTime);
     }
 
-    public static ParsedStatement.Prepared parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
+    public static ParsedStatement.Prepared parseStatement(String queryStr, ClientState clientState) throws RequestValidationException
     {
-        return getStatement(queryStr, queryState.getClientState());
+        return getStatement(queryStr, clientState);
     }
 
     public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException
@@ -277,7 +277,7 @@ public class QueryProcessor implements QueryHandler
             return prepared;
 
         // Note: if 2 threads prepare the same query, we'll live so don't bother synchronizing
-        prepared = parseStatement(query, internalQueryState());
+        prepared = parseStatement(query, internalQueryState().getClientState());
         prepared.statement.validate(internalQueryState().getClientState());
         internalStatements.putIfAbsent(query, prepared);
         return prepared;
@@ -334,7 +334,7 @@ public class QueryProcessor implements QueryHandler
      */
     public static UntypedResultSet executeOnceInternal(String query, Object... values)
     {
-        ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState());
+        ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState().getClientState());
         prepared.statement.validate(internalQueryState().getClientState());
         ResultMessage result = prepared.statement.executeInternal(internalQueryState(), makeInternalOptions(prepared, values));
         if (result instanceof ResultMessage.Rows)
@@ -374,16 +374,10 @@ public class QueryProcessor implements QueryHandler
     }
 
     public ResultMessage.Prepared prepare(String query,
-                                          QueryState state,
+                                          ClientState clientState,
                                           Map<String, ByteBuffer> customPayload) throws RequestValidationException
     {
-        return prepare(query, state);
-    }
-
-    public ResultMessage.Prepared prepare(String queryString, QueryState queryState)
-    {
-        ClientState cState = queryState.getClientState();
-        return prepare(queryString, cState);
+        return prepare(query, clientState);
     }
 
     public static ResultMessage.Prepared prepare(String queryString, ClientState clientState)
@@ -485,7 +479,7 @@ public class QueryProcessor implements QueryHandler
     public ResultMessage processBatch(BatchStatement batch, QueryState queryState, BatchQueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
-        ClientState clientState = queryState.getClientState();
+        ClientState clientState = queryState.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace());
         batch.checkAccess(clientState);
         batch.validate();
         batch.validate(clientState);
@@ -500,7 +494,7 @@ public class QueryProcessor implements QueryHandler
 
         // Set keyspace for statement that require login
         if (statement instanceof CFStatement)
-            ((CFStatement)statement).prepareKeyspace(clientState);
+            ((CFStatement) statement).prepareKeyspace(clientState);
 
         Tracing.trace("Preparing statement");
         return statement.prepare();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CFStatement.java b/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
index 9b2987c..136860e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CFStatement.java
@@ -37,14 +37,16 @@ public abstract class CFStatement extends ParsedStatement
     {
         if (!cfName.hasKeyspace())
         {
-            // XXX: We explicitely only want to call state.getKeyspace() in this case, as we don't want to throw
-            // if not logged in any keyspace but a keyspace is explicitely set on the statement. So don't move
+            // XXX: We explicitly only want to call state.getKeyspace() in this case, as we don't want to throw
+            // if not logged in any keyspace but a keyspace is explicitly set on the statement. So don't move
             // the call outside the 'if' or replace the method by 'prepareKeyspace(state.getKeyspace())'
             cfName.setKeyspace(state.getKeyspace(), true);
         }
     }
 
-    // Only for internal calls, use the version with ClientState for user queries
+    // Only for internal calls, use the version with ClientState for user queries. In particular, the
+    // version with ClientState throws an exception if the statement does not have keyspace set *and*
+    // ClientState has no keyspace.
     public void prepareKeyspace(String keyspace)
     {
         if (!cfName.hasKeyspace())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
index 0cd549a..e617ba7 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
@@ -51,7 +51,7 @@ public abstract class ParsedStatement
         /**
          * Contains the CQL statement source if the statement has been "regularly" perpared via
          * {@link org.apache.cassandra.cql3.QueryProcessor#prepare(java.lang.String, org.apache.cassandra.service.ClientState)} /
-         * {@link QueryHandler#prepare(java.lang.String, org.apache.cassandra.service.QueryState, java.util.Map)}.
+         * {@link QueryHandler#prepare(java.lang.String, org.apache.cassandra.service.ClientState, java.util.Map)}.
          * Other usages of this class may or may not contain the CQL statement source.
          */
         public String rawCQLStatement;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 80cf810..dfddccd 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -139,6 +139,14 @@ public class ClientState
             this.user = AuthenticatedUser.ANONYMOUS_USER;
     }
 
+    protected ClientState(ClientState source)
+    {
+        this.isInternal = source.isInternal;
+        this.remoteAddress = source.remoteAddress;
+        this.user = source.user;
+        this.keyspace = source.keyspace;
+    }
+
     /**
      * @return a ClientState object for internal C* calls (not limited by any kind of auth).
      */
@@ -156,6 +164,22 @@ public class ClientState
     }
 
     /**
+     * Clone this ClientState object, but use the provided keyspace instead of the
+     * keyspace in this ClientState object.
+     *
+     * @return a new ClientState object if the keyspace argument is non-null. Otherwise do not clone
+     *   and return this ClientState object.
+     */
+    public ClientState cloneWithKeyspaceIfSet(String keyspace)
+    {
+        if (keyspace == null)
+            return this;
+        ClientState clientState = new ClientState(this);
+        clientState.setKeyspace(keyspace);
+        return clientState;
+    }
+
+    /**
      * This clock guarantees that updates for the same ClientState will be ordered
      * in the sequence seen, even if multiple updates happen in the same millisecond.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index e428b06..9a76e03 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -136,12 +136,12 @@ public class Client extends SimpleClient
                     return null;
                 }
             }
-            return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null, version));
+            return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null, version, null));
         }
         else if (msgType.equals("PREPARE"))
         {
             String query = line.substring(8);
-            return new PrepareMessage(query);
+            return new PrepareMessage(query, null);
         }
         else if (msgType.equals("EXECUTE"))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 1bb081b..13cd9bd 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -189,7 +189,7 @@ public class SimpleClient implements Closeable
 
     public ResultMessage.Prepared prepare(String query)
     {
-        Message.Response msg = execute(new PrepareMessage(query));
+        Message.Response msg = execute(new PrepareMessage(query, null));
         assert msg instanceof ResultMessage.Prepared;
         return (ResultMessage.Prepared)msg;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index bb6411f..d9123d4 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -180,7 +180,8 @@ public class BatchMessage extends Message.Request
                 ParsedStatement.Prepared p;
                 if (query instanceof String)
                 {
-                    p = QueryProcessor.parseStatement((String)query, state);
+                    p = QueryProcessor.parseStatement((String)query,
+                                                      state.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace()));
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index b0c9dbe..bb0fc3a 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -36,26 +36,59 @@ public class PrepareMessage extends Message.Request
         public PrepareMessage decode(ByteBuf body, ProtocolVersion version)
         {
             String query = CBUtil.readLongString(body);
-            return new PrepareMessage(query);
+            String keyspace = null;
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) {
+                // If flags grows, we may want to consider creating a PrepareOptions class with an internal codec
+                // class that handles flags and options of the prepare message. Since there's only one right now,
+                // we just take care of business here.
+
+                int flags = (int)body.readUnsignedInt();
+                if ((flags & 0x1) == 0x1)
+                    keyspace = CBUtil.readString(body);
+            }
+            return new PrepareMessage(query, keyspace);
         }
 
         public void encode(PrepareMessage msg, ByteBuf dest, ProtocolVersion version)
         {
             CBUtil.writeLongString(msg.query, dest);
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+            {
+                // If we have no keyspace, write out a 0-valued flag field.
+                if (msg.keyspace == null)
+                    dest.writeInt(0x0);
+                else {
+                    dest.writeInt(0x1);
+                    CBUtil.writeString(msg.keyspace, dest);
+                }
+            }
         }
 
         public int encodedSize(PrepareMessage msg, ProtocolVersion version)
         {
-            return CBUtil.sizeOfLongString(msg.query);
+            int size = CBUtil.sizeOfLongString(msg.query);
+            if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
+            {
+                // We always emit a flags int
+                size += 4;
+
+                // If we have a keyspace, we'd write it out. Otherwise, we'd write nothing.
+                size += msg.keyspace == null
+                    ? 0
+                    : CBUtil.sizeOfString(msg.keyspace);
+            }
+            return size;
         }
     };
 
     private final String query;
+    private final String keyspace;
 
-    public PrepareMessage(String query)
+    public PrepareMessage(String query, String keyspace)
     {
         super(Message.Type.PREPARE);
         this.query = query;
+        this.keyspace = keyspace;
     }
 
     public Message.Response execute(QueryState state, long queryStartNanoTime)
@@ -75,7 +108,9 @@ public class PrepareMessage extends Message.Request
                 Tracing.instance.begin("Preparing CQL3 query", state.getClientAddress(), ImmutableMap.of("query", query));
             }
 
-            Message.Response response = ClientState.getCQLQueryHandler().prepare(query, state, getCustomPayload());
+            Message.Response response = ClientState.getCQLQueryHandler().prepare(query,
+                                                                                 state.getClientState().cloneWithKeyspaceIfSet(keyspace),
+                                                                                 getCustomPayload());
 
             if (tracingId != null)
                 response.setTracingId(tracingId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index c247d96..26437c9 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -710,7 +710,7 @@ public abstract class CQLTester
             state.setKeyspace(SchemaConstants.SYSTEM_KEYSPACE_NAME);
             QueryState queryState = new QueryState(state);
 
-            ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(query, queryState);
+            ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(query, queryState.getClientState());
             prepared.statement.validate(state);
 
             QueryOptions options = QueryOptions.forInternalCalls(Collections.<ByteBuffer>emptyList());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index cfbfd39..c27593b 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.transport.messages.QueryMessage;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.MD5Digest;
 
+import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public class MessagePayloadTest extends CQLTester
@@ -112,6 +113,86 @@ public class MessagePayloadTest extends CQLTester
     }
 
     @Test
+    public void testMessagePayloadBeta() throws Throwable
+    {
+        QueryHandler queryHandler = (QueryHandler) cqlQueryHandlerField.get(null);
+        cqlQueryHandlerField.set(null, new TestQueryHandler());
+        try
+        {
+            requireNetwork();
+
+            Assert.assertSame(TestQueryHandler.class, ClientState.getCQLQueryHandler().getClass());
+
+            SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                   nativePort,
+                                                   ProtocolVersion.V5,
+                                                   true,
+                                                   new ClientEncryptionOptions());
+            try
+            {
+                client.connect(false);
+
+                Map<String, ByteBuffer> reqMap;
+                Map<String, ByteBuffer> respMap;
+
+                QueryOptions queryOptions = QueryOptions.create(
+                  QueryOptions.DEFAULT.getConsistency(),
+                  QueryOptions.DEFAULT.getValues(),
+                  QueryOptions.DEFAULT.skipMetadata(),
+                  QueryOptions.DEFAULT.getPageSize(),
+                  QueryOptions.DEFAULT.getPagingState(),
+                  QueryOptions.DEFAULT.getSerialConsistency(),
+                  ProtocolVersion.V5,
+                  KEYSPACE);
+                QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
+                                                             queryOptions);
+                PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM atable", KEYSPACE);
+
+                reqMap = Collections.singletonMap("foo", bytes(42));
+                responsePayload = respMap = Collections.singletonMap("bar", bytes(42));
+                queryMessage.setCustomPayload(reqMap);
+                Message.Response queryResponse = client.execute(queryMessage);
+                payloadEquals(reqMap, requestPayload);
+                payloadEquals(respMap, queryResponse.getCustomPayload());
+
+                reqMap = Collections.singletonMap("foo", bytes(43));
+                responsePayload = respMap = Collections.singletonMap("bar", bytes(43));
+                prepareMessage.setCustomPayload(reqMap);
+                ResultMessage.Prepared prepareResponse = (ResultMessage.Prepared) client.execute(prepareMessage);
+                payloadEquals(reqMap, requestPayload);
+                payloadEquals(respMap, prepareResponse.getCustomPayload());
+
+                ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT);
+                reqMap = Collections.singletonMap("foo", bytes(44));
+                responsePayload = respMap = Collections.singletonMap("bar", bytes(44));
+                executeMessage.setCustomPayload(reqMap);
+                Message.Response executeResponse = client.execute(executeMessage);
+                payloadEquals(reqMap, requestPayload);
+                payloadEquals(respMap, executeResponse.getCustomPayload());
+
+                BatchMessage batchMessage = new BatchMessage(BatchStatement.Type.UNLOGGED,
+                                                             Collections.<Object>singletonList("INSERT INTO atable (pk,v) VALUES (1, 'foo')"),
+                                                             Collections.singletonList(Collections.<ByteBuffer>emptyList()),
+                                                             queryOptions);
+                reqMap = Collections.singletonMap("foo", bytes(45));
+                responsePayload = respMap = Collections.singletonMap("bar", bytes(45));
+                batchMessage.setCustomPayload(reqMap);
+                Message.Response batchResponse = client.execute(batchMessage);
+                payloadEquals(reqMap, requestPayload);
+                payloadEquals(respMap, batchResponse.getCustomPayload());
+            }
+            finally
+            {
+                client.close();
+            }
+        }
+        finally
+        {
+            cqlQueryHandlerField.set(null, queryHandler);
+        }
+    }
+
+    @Test
     public void testMessagePayload() throws Throwable
     {
         QueryHandler queryHandler = (QueryHandler) cqlQueryHandlerField.get(null);
@@ -134,7 +215,7 @@ public class MessagePayloadTest extends CQLTester
                                                             "CREATE TABLE " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)",
                                                             QueryOptions.DEFAULT
                 );
-                PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable");
+                PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable", null);
 
                 reqMap = Collections.singletonMap("foo", bytes(42));
                 responsePayload = respMap = Collections.singletonMap("bar", bytes(42));
@@ -202,7 +283,7 @@ public class MessagePayloadTest extends CQLTester
                                                             "CREATE TABLE " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)",
                                                             QueryOptions.DEFAULT
                 );
-                PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable");
+                PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable", null);
 
                 reqMap = Collections.singletonMap("foo", bytes(42));
                 responsePayload = Collections.singletonMap("bar", bytes(42));
@@ -293,13 +374,13 @@ public class MessagePayloadTest extends CQLTester
         }
 
         public ResultMessage.Prepared prepare(String query,
-                                              QueryState state,
+                                              ClientState clientState,
                                               Map<String, ByteBuffer> customPayload)
                                                       throws RequestValidationException
         {
             if (customPayload != null)
                 requestPayload = customPayload;
-            ResultMessage.Prepared result = QueryProcessor.instance.prepare(query, state, customPayload);
+            ResultMessage.Prepared result = QueryProcessor.instance.prepare(query, clientState, customPayload);
             if (customPayload != null)
             {
                 result.setCustomPayload(responsePayload);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
index a1260ba..2592ae7 100644
--- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -314,15 +314,30 @@ public class SerDeserTest
 
     private void queryOptionsSerDeserTest(ProtocolVersion version) throws Exception
     {
-        QueryOptions options = QueryOptions.create(ConsistencyLevel.ALL,
-                                                   Collections.singletonList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 })),
-                                                   false,
-                                                   5000,
-                                                   Util.makeSomePagingState(version),
-                                                   ConsistencyLevel.SERIAL,
-                                                   version
-                                                   );
+        queryOptionsSerDeserTest(version, QueryOptions.create(ConsistencyLevel.ALL,
+                                                              Collections.singletonList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 })),
+                                                              false,
+                                                              5000,
+                                                              Util.makeSomePagingState(version),
+                                                              ConsistencyLevel.SERIAL,
+                                                              version,
+                                                              null
+        ));
+
+        queryOptionsSerDeserTest(version, QueryOptions.create(ConsistencyLevel.LOCAL_ONE,
+                                                              Arrays.asList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 }),
+                                                                            ByteBuffer.wrap(new byte[] { 0x03, 0x04, 0x05, 0x03, 0x04, 0x05 })),
+                                                              true,
+                                                              10,
+                                                              Util.makeSomePagingState(version),
+                                                              ConsistencyLevel.SERIAL,
+                                                              version,
+                                                              "some_keyspace"
+        ));
+    }
 
+    private void queryOptionsSerDeserTest(ProtocolVersion version, QueryOptions options)
+    {
         ByteBuf buf = Unpooled.buffer(QueryOptions.codec.encodedSize(options, version));
         QueryOptions.codec.encode(options, buf, version);
         QueryOptions decodedOptions = QueryOptions.codec.decode(buf, version);
@@ -335,5 +350,6 @@ public class SerDeserTest
         assertEquals(options.getValues(), decodedOptions.getValues());
         assertEquals(options.getPagingState(), decodedOptions.getPagingState());
         assertEquals(options.skipMetadata(), decodedOptions.skipMetadata());
+        assertEquals(options.getKeyspace(), decodedOptions.getKeyspace());
     }
 }