You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/30 20:22:43 UTC

[4/4] git commit: Native protocol v3

Native protocol v3

patch by slebresne; reviewed by thobbs for CASSANDRA-6855


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9872b74e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9872b74e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9872b74e

Branch: refs/heads/cassandra-2.1
Commit: 9872b74ef20018e4e7645a8952fd7295e75764ad
Parents: ece3864
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Mar 12 18:58:55 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Apr 30 20:21:35 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   2 +-
 doc/native_protocol_v3.spec                     | 911 +++++++++++++++++++
 src/java/org/apache/cassandra/auth/Auth.java    |  16 +-
 .../cassandra/auth/CassandraAuthorizer.java     |   6 +-
 .../cassandra/auth/PasswordAuthenticator.java   |   4 +-
 .../org/apache/cassandra/cql3/Attributes.java   |   8 +-
 .../cassandra/cql3/BatchQueryOptions.java       |  81 +-
 .../apache/cassandra/cql3/ColumnCondition.java  |  32 +-
 .../org/apache/cassandra/cql3/Constants.java    |  20 +-
 src/java/org/apache/cassandra/cql3/Lists.java   |  34 +-
 src/java/org/apache/cassandra/cql3/Maps.java    |  32 +-
 .../org/apache/cassandra/cql3/QueryHandler.java |   3 +-
 .../org/apache/cassandra/cql3/QueryOptions.java | 283 ++++--
 .../apache/cassandra/cql3/QueryProcessor.java   |  29 +-
 .../org/apache/cassandra/cql3/ResultSet.java    |   6 +-
 src/java/org/apache/cassandra/cql3/Sets.java    |  26 +-
 src/java/org/apache/cassandra/cql3/Term.java    |  18 +-
 .../apache/cassandra/cql3/UpdateParameters.java |   6 +-
 .../org/apache/cassandra/cql3/UserTypes.java    |  18 +-
 .../cassandra/cql3/functions/FunctionCall.java  |  20 +-
 .../cql3/statements/BatchStatement.java         |  95 +-
 .../cql3/statements/CQL3CasConditions.java      |  14 +-
 .../cql3/statements/ModificationStatement.java  |  63 +-
 .../cassandra/cql3/statements/Restriction.java  |  28 +-
 .../cql3/statements/SelectStatement.java        | 150 +--
 .../org/apache/cassandra/db/DefsTables.java     |  19 +-
 .../cassandra/db/marshal/CollectionType.java    |  29 +-
 .../apache/cassandra/db/marshal/ListType.java   |  12 +-
 .../apache/cassandra/db/marshal/MapType.java    |  21 +-
 .../apache/cassandra/db/marshal/SetType.java    |  15 +-
 .../apache/cassandra/db/marshal/UserType.java   |   5 +
 .../hadoop/pig/AbstractCassandraStorage.java    |  11 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  |  11 +-
 .../serializers/CollectionSerializer.java       | 106 ++-
 .../cassandra/serializers/ListSerializer.java   |  39 +-
 .../cassandra/serializers/MapSerializer.java    |  48 +-
 .../cassandra/serializers/SetSerializer.java    |  39 +-
 .../cassandra/service/IMigrationListener.java   |   3 +
 .../cassandra/service/MigrationManager.java     |  18 +
 .../cassandra/thrift/CassandraServer.java       |   4 +-
 .../org/apache/cassandra/transport/CBUtil.java  |  17 +
 .../org/apache/cassandra/transport/Client.java  |   4 +-
 .../apache/cassandra/transport/DataType.java    |  79 +-
 .../org/apache/cassandra/transport/Event.java   | 158 +++-
 .../apache/cassandra/transport/OptionCodec.java |  28 +-
 .../org/apache/cassandra/transport/Server.java  |  21 +-
 .../cassandra/transport/SimpleClient.java       |   4 +-
 .../transport/messages/BatchMessage.java        |  53 +-
 .../transport/messages/EventMessage.java        |   6 +-
 .../transport/messages/ExecuteMessage.java      |   5 +-
 .../cassandra/transport/SerDeserTest.java       | 217 +++++
 52 files changed, 2232 insertions(+), 646 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64e5afb..a4811f6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -54,6 +54,7 @@
  * Preemptive opening of compaction result (CASSANDRA-6916)
  * Multi-threaded scrub/cleanup/upgradesstables (CASSANDRA-5547)
  * Optimize cellname comparison (CASSANDRA-6934)
+ * Native protocol v3 (CASSANDRA-6855)
 Merged from 2.0:
  * Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072)
  * Set JMX RMI port to 7199 (CASSANDRA-7087)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index d2b892e..ba29b37 100644
--- a/build.xml
+++ b/build.xml
@@ -387,7 +387,7 @@
           <dependency groupId="com.addthis.metrics" artifactId="reporter-config" version="2.1.0" />
           <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
           <dependency groupId="io.airlift" artifactId="airline" version="0.6" />
-          <dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
+          <dependency groupId="io.netty" artifactId="netty" version="4.0.17.Final" />
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
           <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" />

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/doc/native_protocol_v3.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v3.spec b/doc/native_protocol_v3.spec
new file mode 100644
index 0000000..6662f1c
--- /dev/null
+++ b/doc/native_protocol_v3.spec
@@ -0,0 +1,911 @@
+
+                             CQL BINARY PROTOCOL v3
+
+
+Table of Contents
+
+  1. Overview
+  2. Frame header
+    2.1. version
+    2.2. flags
+    2.3. stream
+    2.4. opcode
+    2.5. length
+  3. Notations
+  4. Messages
+    4.1. Requests
+      4.1.1. STARTUP
+      4.1.2. AUTH_RESPONSE
+      4.1.3. OPTIONS
+      4.1.4. QUERY
+      4.1.5. PREPARE
+      4.1.6. EXECUTE
+      4.1.7. BATCH
+      4.1.8. REGISTER
+    4.2. Responses
+      4.2.1. ERROR
+      4.2.2. READY
+      4.2.3. AUTHENTICATE
+      4.2.4. SUPPORTED
+      4.2.5. RESULT
+        4.2.5.1. Void
+        4.2.5.2. Rows
+        4.2.5.3. Set_keyspace
+        4.2.5.4. Prepared
+        4.2.5.5. Schema_change
+      4.2.6. EVENT
+      4.2.7. AUTH_CHALLENGE
+      4.2.8. AUTH_SUCCESS
+  5. Compression
+  6. Collection types
+  7. User Defined types
+  8. Result paging
+  9. Error codes
+  10. Changes from v2
+
+
+1. Overview
+
+  The CQL binary protocol is a frame based protocol. Frames are defined as:
+
+      0         8        16        24        32
+      +---------+---------+---------+---------+
+      | version |  flags  | stream  | opcode  |
+      +---------+---------+---------+---------+
+      |                length                 |
+      +---------+---------+---------+---------+
+      |                                       |
+      .            ...  body ...              .
+      .                                       .
+      .                                       .
+      +----------------------------------------
+
+  The protocol is big-endian (network byte order).
+
+  Each frame contains a fixed size header (8 bytes) followed by a variable size
+  body. The header is described in Section 2. The content of the body depends
+  on the header opcode value (the body can in particular be empty for some
+  opcode values). The list of allowed opcode is defined Section 2.3 and the
+  details of each corresponding message is described Section 4.
+
+  The protocol distinguishes 2 types of frames: requests and responses. Requests
+  are those frame sent by the clients to the server, response are the ones sent
+  by the server. Note however that the protocol supports server pushes (events)
+  so responses does not necessarily come right after a client request.
+
+  Note to client implementors: clients library should always assume that the
+  body of a given frame may contain more data than what is described in this
+  document. It will however always be safe to ignore the remaining of the frame
+  body in such cases. The reason is that this may allow to sometimes extend the
+  protocol with optional features without needing to change the protocol
+  version.
+
+
+
+2. Frame header
+
+2.1. version
+
+  The version is a single byte that indicate both the direction of the message
+  (request or response) and the version of the protocol in use. The up-most bit
+  of version is used to define the direction of the message: 0 indicates a
+  request, 1 indicates a responses. This can be useful for protocol analyzers to
+  distinguish the nature of the packet from the direction which it is moving.
+  The rest of that byte is the protocol version (3 for the protocol defined in
+  this document). In other words, for this version of the protocol, version will
+  have one of:
+    0x03    Request frame for this protocol version
+    0x83    Response frame for this protocol version
+
+  Please note that the while every message ship with the version, only one version
+  of messages is accepted on a given connection. In other words, the first message
+  exchanged (STARTUP) sets the version for the connection for the lifetime of this
+  connection.
+
+  This document describe the version 3 of the protocol. For the changes made since
+  version 2, see Section 10.
+
+
+2.2. flags
+
+  Flags applying to this frame. The flags have the following meaning (described
+  by the mask that allow to select them):
+    0x01: Compression flag. If set, the frame body is compressed. The actual
+          compression to use should have been set up beforehand through the
+          Startup message (which thus cannot be compressed; Section 4.1.1).
+    0x02: Tracing flag. For a request frame, this indicate the client requires
+          tracing of the request. Note that not all requests support tracing.
+          Currently, only QUERY, PREPARE and EXECUTE queries support tracing.
+          Other requests will simply ignore the tracing flag if set. If a
+          request support tracing and the tracing flag was set, the response to
+          this request will have the tracing flag set and contain tracing
+          information.
+          If a response frame has the tracing flag set, its body contains
+          a tracing ID. The tracing ID is a [uuid] and is the first thing in
+          the frame body. The rest of the body will then be the usual body
+          corresponding to the response opcode.
+
+  The rest of the flags is currently unused and ignored.
+
+2.3. stream
+
+  A frame has a stream id (one signed byte). When sending request messages, this
+  stream id must be set by the client to a positive byte (negative stream id
+  are reserved for streams initiated by the server; currently all EVENT messages
+  (section 4.2.6) have a streamId of -1). If a client sends a request message
+  with the stream id X, it is guaranteed that the stream id of the response to
+  that message will be X.
+
+  This allow to deal with the asynchronous nature of the protocol. If a client
+  sends multiple messages simultaneously (without waiting for responses), there
+  is no guarantee on the order of the responses. For instance, if the client
+  writes REQ_1, REQ_2, REQ_3 on the wire (in that order), the server might
+  respond to REQ_3 (or REQ_2) first. Assigning different stream id to these 3
+  requests allows the client to distinguish to which request an received answer
+  respond to. As there can only be 128 different simultaneous stream, it is up
+  to the client to reuse stream id.
+
+  Note that clients are free to use the protocol synchronously (i.e. wait for
+  the response to REQ_N before sending REQ_N+1). In that case, the stream id
+  can be safely set to 0. Clients should also feel free to use only a subset of
+  the 128 maximum possible stream ids if it is simpler for those
+  implementation.
+
+2.4. opcode
+
+  An integer byte that distinguish the actual message:
+    0x00    ERROR
+    0x01    STARTUP
+    0x02    READY
+    0x03    AUTHENTICATE
+    0x05    OPTIONS
+    0x06    SUPPORTED
+    0x07    QUERY
+    0x08    RESULT
+    0x09    PREPARE
+    0x0A    EXECUTE
+    0x0B    REGISTER
+    0x0C    EVENT
+    0x0D    BATCH
+    0x0E    AUTH_CHALLENGE
+    0x0F    AUTH_RESPONSE
+    0x10    AUTH_SUCCESS
+
+  Messages are described in Section 4.
+
+  (Note that there is no 0x04 message in this version of the protocol)
+
+
+2.5. length
+
+  A 4 byte integer representing the length of the body of the frame (note:
+  currently a frame is limited to 256MB in length).
+
+
+3. Notations
+
+  To describe the layout of the frame body for the messages in Section 4, we
+  define the following:
+
+    [int]          A 4 bytes integer
+    [long]         A 8 bytes integer
+    [short]        A 2 bytes unsigned integer
+    [string]       A [short] n, followed by n bytes representing an UTF-8
+                   string.
+    [long string]  An [int] n, followed by n bytes representing an UTF-8 string.
+    [uuid]         A 16 bytes long uuid.
+    [string list]  A [short] n, followed by n [string].
+    [bytes]        A [int] n, followed by n bytes if n >= 0. If n < 0,
+                   no byte should follow and the value represented is `null`.
+    [short bytes]  A [short] n, followed by n bytes if n >= 0.
+
+    [option]       A pair of <id><value> where <id> is a [short] representing
+                   the option id and <value> depends on that option (and can be
+                   of size 0). The supported id (and the corresponding <value>)
+                   will be described when this is used.
+    [option list]  A [short] n, followed by n [option].
+    [inet]         An address (ip and port) to a node. It consists of one
+                   [byte] n, that represents the address size, followed by n
+                   [byte] representing the IP address (in practice n can only be
+                   either 4 (IPv4) or 16 (IPv6)), following by one [int]
+                   representing the port.
+    [consistency]  A consistency level specification. This is a [short]
+                   representing a consistency level with the following
+                   correspondance:
+                     0x0000    ANY
+                     0x0001    ONE
+                     0x0002    TWO
+                     0x0003    THREE
+                     0x0004    QUORUM
+                     0x0005    ALL
+                     0x0006    LOCAL_QUORUM
+                     0x0007    EACH_QUORUM
+                     0x0008    SERIAL
+                     0x0009    LOCAL_SERIAL
+                     0x000A    LOCAL_ONE
+
+    [string map]      A [short] n, followed by n pair <k><v> where <k> and <v>
+                      are [string].
+    [string multimap] A [short] n, followed by n pair <k><v> where <k> is a
+                      [string] and <v> is a [string list].
+
+
+4. Messages
+
+4.1. Requests
+
+  Note that outside of their normal responses (described below), all requests
+  can get an ERROR message (Section 4.2.1) as response.
+
+4.1.1. STARTUP
+
+  Initialize the connection. The server will respond by either a READY message
+  (in which case the connection is ready for queries) or an AUTHENTICATE message
+  (in which case credentials will need to be provided using AUTH_RESPONSE).
+
+  This must be the first message of the connection, except for OPTIONS that can
+  be sent before to find out the options supported by the server. Once the
+  connection has been initialized, a client should not send any more STARTUP
+  message.
+
+  The body is a [string map] of options. Possible options are:
+    - "CQL_VERSION": the version of CQL to use. This option is mandatory and
+      currenty, the only version supported is "3.0.0". Note that this is
+      different from the protocol version.
+    - "COMPRESSION": the compression algorithm to use for frames (See section 5).
+      This is optional, if not specified no compression will be used.
+
+
+4.1.2. AUTH_RESPONSE
+
+  Answers a server authentication challenge.
+
+  Authentication in the protocol is SASL based. The server sends authentication
+  challenges (a bytes token) to which the client answer with this message. Those
+  exchanges continue until the server accepts the authentication by sending a
+  AUTH_SUCCESS message after a client AUTH_RESPONSE. It is however that client that
+  initiate the exchange by sending an initial AUTH_RESPONSE in response to a
+  server AUTHENTICATE request.
+
+  The body of this message is a single [bytes] token. The details of what this
+  token contains (and when it can be null/empty, if ever) depends on the actual
+  authenticator used.
+
+  The response to a AUTH_RESPONSE is either a follow-up AUTH_CHALLENGE message,
+  an AUTH_SUCCESS message or an ERROR message.
+
+
+4.1.3. OPTIONS
+
+  Asks the server to return what STARTUP options are supported. The body of an
+  OPTIONS message should be empty and the server will respond with a SUPPORTED
+  message.
+
+
+4.1.4. QUERY
+
+  Performs a CQL query. The body of the message must be:
+    <query><query_parameters>
+  where <query> is a [long string] representing the query and
+  <query_parameters> must be
+    <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>]
+  where:
+    - <consistency> is the [consistency] level for the operation.
+    - <flags> is a [byte] whose bits define the options for this query and
+      in particular influence what the remainder of the message contains.
+      A flag is set if the bit corresponding to its `mask` is set. Supported
+      flags are, given there mask:
+        0x01: Values. In that case, a [short] <n> followed by <n> [bytes]
+              values are provided. Those value are used for bound variables in
+              the query. Optionally, if the 0x40 flag is present, each value
+              will be preceded by a [string] name, representing the name of
+              the marker the value must be binded to. This is optional, and
+              if not present, values will be binded by position.
+        0x02: Skip_metadata. If present, the Result Set returned as a response
+              to that query (if any) will have the NO_METADATA flag (see
+              Section 4.2.5.2).
+        0x04: Page_size. In that case, <result_page_size> is an [int]
+              controlling the desired page size of the result (in CQL3 rows).
+              See the section on paging (Section 7) for more details.
+        0x08: With_paging_state. If present, <paging_state> should be present.
+              <paging_state> is a [bytes] value that should have been returned
+              in a result set (Section 4.2.5.2). If provided, the query will be
+              executed but starting from a given paging state. This also to
+              continue paging on a different node from the one it has been
+              started (See Section 7 for more details).
+        0x10: With serial consistency. If present, <serial_consistency> should be
+              present. <serial_consistency> is the [consistency] level for the
+              serial phase of conditional updates. That consitency can only be
+              either SERIAL or LOCAL_SERIAL and if not present, it defaults to
+              SERIAL. This option will be ignored for anything else that a
+              conditional update/insert.
+        0x20: With default timestamp. If present, <timestamp> should be present.
+              <timestamp> is a [long] representing the default timestamp for the query
+              in microseconds (negative values are forbidden). If provided, this will
+              replace the server side assigned timestamp as default timestamp.
+              Note that a timestamp in the query itself will still override
+              this timestamp. This is entirely optional.
+        0x40: With names for values. This only makes sense if the 0x01 flag is set and
+              is ignored otherwise. If present, the values from the 0x01 flag will
+              be preceded by a name (see above). Note that this is only useful for
+              QUERY requests where named bind markers are used; for EXECUTE statements,
+              since the names for the expected values was returned during preparation,
+              a client can always provide values in the right order without any names
+              and using this flag, while supported, is almost surely inefficient.
+
+  Note that the consistency is ignored by some queries (USE, CREATE, ALTER,
+  TRUNCATE, ...).
+
+  The server will respond to a QUERY message with a RESULT message, the content
+  of which depends on the query.
+
+
+4.1.5. PREPARE
+
+  Prepare a query for later execution (through EXECUTE). The body consists of
+  the CQL query to prepare as a [long string].
+
+  The server will respond with a RESULT message with a `prepared` kind (0x0004,
+  see Section 4.2.5).
+
+
+4.1.6. EXECUTE
+
+  Executes a prepared query. The body of the message must be:
+    <id><query_parameters>
+  where <id> is the prepared query ID. It's the [short bytes] returned as a
+  response to a PREPARE message. As for <query_parameters>, it has the exact
+  same definition than in QUERY (see Section 4.1.4).
+
+  The response from the server will be a RESULT message.
+
+
+4.1.7. BATCH
+
+  Allows executing a list of queries (prepared or not) as a batch (note that
+  only DML statements are accepted in a batch). The body of the message must
+  be:
+    <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>]
+  where:
+    - <type> is a [byte] indicating the type of batch to use:
+        - If <type> == 0, the batch will be "logged". This is equivalent to a
+          normal CQL3 batch statement.
+        - If <type> == 1, the batch will be "unlogged".
+        - If <type> == 2, the batch will be a "counter" batch (and non-counter
+          statements will be rejected).
+    - <flags> is a [byte] whose bits define the options for this query and
+      in particular influence the remainder of the message contains. It is similar
+      to the <flags> from QUERY and EXECUTE methods, except that the 4 rightmost
+      bits must always be 0 as their corresponding option do not make sense for
+      Batch. A flag is set if the bit corresponding to its `mask` is set. Supported
+      flags are, given there mask:
+        0x10: With serial consistency. If present, <serial_consistency> should be
+              present. <serial_consistency> is the [consistency] level for the
+              serial phase of conditional updates. That consitency can only be
+              either SERIAL or LOCAL_SERIAL and if not present, it defaults to
+              SERIAL. This option will be ignored for anything else that a
+              conditional update/insert.
+        0x20: With default timestamp. If present, <timestamp> should be present.
+              <timestamp> is a [long] representing the default timestamp for the query
+              in microseconds. If provided, this will replace the server side assigned
+              timestamp as default timestamp. Note that a timestamp in the query itself
+              will still override this timestamp. This is entirely optional.
+        0x40: With names for values. If set, then all values for all <query_i> must be
+              preceded by a [string] <name_i> that have the same meaning as in QUERY
+              requests.
+    - <n> is a [short] indicating the number of following queries.
+    - <query_1>...<query_n> are the queries to execute. A <query_i> must be of the
+      form:
+        <kind><string_or_id><n>[<name_1>]<value_1>...[<name_n>]<value_n>
+      where:
+       - <kind> is a [byte] indicating whether the following query is a prepared
+         one or not. <kind> value must be either 0 or 1.
+       - <string_or_id> depends on the value of <kind>. If <kind> == 0, it should be
+         a [long string] query string (as in QUERY, the query string might contain
+         bind markers). Otherwise (that is, if <kind> == 1), it should be a
+         [short bytes] representing a prepared query ID.
+       - <n> is a [short] indicating the number (possibly 0) of following values.
+       - <name_i> is the optional name of the following <value_i>. It must be present
+         if and only if the 0x40 flag is provided for the batch.
+       - <value_i> is the [bytes] to use for bound variable i (of bound variable <name_i>
+         if the 0x40 flag is used).
+    - <consistency> is the [consistency] level for the operation.
+    - <serial_consistency> is only present if the 0x10 flag is set. In that case,
+      <serial_consistency> is the [consistency] level for the serial phase of
+      conditional updates. That consitency can only be either SERIAL or
+      LOCAL_SERIAL and if not present will defaults to SERIAL. This option will
+      be ignored for anything else that a conditional update/insert.
+
+  The server will respond with a RESULT message.
+
+
+4.1.8. REGISTER
+
+  Register this connection to receive some type of events. The body of the
+  message is a [string list] representing the event types to register to. See
+  section 4.2.6 for the list of valid event types.
+
+  The response to a REGISTER message will be a READY message.
+
+  Please note that if a client driver maintains multiple connections to a
+  Cassandra node and/or connections to multiple nodes, it is advised to
+  dedicate a handful of connections to receive events, but to *not* register
+  for events on all connections, as this would only result in receiving
+  multiple times the same event messages, wasting bandwidth.
+
+
+4.2. Responses
+
+  This section describes the content of the frame body for the different
+  responses. Please note that to make room for future evolution, clients should
+  support extra informations (that they should simply discard) to the one
+  described in this document at the end of the frame body.
+
+4.2.1. ERROR
+
+  Indicates an error processing a request. The body of the message will be an
+  error code ([int]) followed by a [string] error message. Then, depending on
+  the exception, more content may follow. The error codes are defined in
+  Section 8, along with their additional content if any.
+
+
+4.2.2. READY
+
+  Indicates that the server is ready to process queries. This message will be
+  sent by the server either after a STARTUP message if no authentication is
+  required, or after a successful CREDENTIALS message.
+
+  The body of a READY message is empty.
+
+
+4.2.3. AUTHENTICATE
+
+  Indicates that the server require authentication, and which authentication
+  mechanism to use.
+
+  The authentication is SASL based and thus consists on a number of server
+  challenges (AUTH_CHALLENGE, Section 4.2.7) followed by client responses
+  (AUTH_RESPONSE, Section 4.1.2). The Initial exchange is however boostrapped
+  by an initial client response. The details of that exchange (including how
+  much challenge-response pair are required) are specific to the authenticator
+  in use. The exchange ends when the server sends an AUTH_SUCCESS message or
+  an ERROR message.
+
+  This message will be sent following a STARTUP message if authentication is
+  required and must be answered by a AUTH_RESPONSE message from the client.
+
+  The body consists of a single [string] indicating the full class name of the
+  IAuthenticator in use.
+
+
+4.2.4. SUPPORTED
+
+  Indicates which startup options are supported by the server. This message
+  comes as a response to an OPTIONS message.
+
+  The body of a SUPPORTED message is a [string multimap]. This multimap gives
+  for each of the supported STARTUP options, the list of supported values.
+
+
+4.2.5. RESULT
+
+  The result to a query (QUERY, PREPARE, EXECUTE or BATCH messages).
+
+  The first element of the body of a RESULT message is an [int] representing the
+  `kind` of result. The rest of the body depends on the kind. The kind can be
+  one of:
+    0x0001    Void: for results carrying no information.
+    0x0002    Rows: for results to select queries, returning a set of rows.
+    0x0003    Set_keyspace: the result to a `use` query.
+    0x0004    Prepared: result to a PREPARE message.
+    0x0005    Schema_change: the result to a schema altering query.
+
+  The body for each kind (after the [int] kind) is defined below.
+
+
+4.2.5.1. Void
+
+  The rest of the body for a Void result is empty. It indicates that a query was
+  successful without providing more information.
+
+
+4.2.5.2. Rows
+
+  Indicates a set of rows. The rest of body of a Rows result is:
+    <metadata><rows_count><rows_content>
+  where:
+    - <metadata> is composed of:
+        <flags><columns_count>[<paging_state>][<global_table_spec>?<col_spec_1>...<col_spec_n>]
+      where:
+        - <flags> is an [int]. The bits of <flags> provides information on the
+          formatting of the remaining informations. A flag is set if the bit
+          corresponding to its `mask` is set. Supported flags are, given there
+          mask:
+            0x0001    Global_tables_spec: if set, only one table spec (keyspace
+                      and table name) is provided as <global_table_spec>. If not
+                      set, <global_table_spec> is not present.
+            0x0002    Has_more_pages: indicates whether this is not the last
+                      page of results and more should be retrieve. If set, the
+                      <paging_state> will be present. The <paging_state> is a
+                      [bytes] value that should be used in QUERY/EXECUTE to
+                      continue paging and retrieve the remained of the result for
+                      this query (See Section 7 for more details).
+            0x0004    No_metadata: if set, the <metadata> is only composed of
+                      these <flags>, the <column_count> and optionally the
+                      <paging_state> (depending on the Has_more_pages flage) but
+                      no other information (so no <global_table_spec> nor <col_spec_i>).
+                      This will only ever be the case if this was requested
+                      during the query (see QUERY and RESULT messages).
+        - <columns_count> is an [int] representing the number of columns selected
+          by the query this result is of. It defines the number of <col_spec_i>
+          elements in and the number of element for each row in <rows_content>.
+        - <global_table_spec> is present if the Global_tables_spec is set in
+          <flags>. If present, it is composed of two [string] representing the
+          (unique) keyspace name and table name the columns return are of.
+        - <col_spec_i> specifies the columns returned in the query. There is
+          <column_count> such column specifications that are composed of:
+            (<ksname><tablename>)?<name><type>
+          The initial <ksname> and <tablename> are two [string] are only present
+          if the Global_tables_spec flag is not set. The <column_name> is a
+          [string] and <type> is an [option] that correspond to the description
+          (what this description is depends a bit on the context: in results to
+          selects, this will be either the user chosen alias or the selection used
+          (often a colum name, but it can be a function call too). In results to
+          a PREPARE, this will be either the name of the bind variable corresponding
+          or the column name for the variable if it is "anonymous") and type of
+          the corresponding result. The option for <type> is either a native
+          type (see below), in which case the option has no value, or a
+          'custom' type, in which case the value is a [string] representing
+          the full qualified class name of the type represented. Valid option
+          ids are:
+            0x0000    Custom: the value is a [string], see above.
+            0x0001    Ascii
+            0x0002    Bigint
+            0x0003    Blob
+            0x0004    Boolean
+            0x0005    Counter
+            0x0006    Decimal
+            0x0007    Double
+            0x0008    Float
+            0x0009    Int
+            0x000B    Timestamp
+            0x000C    Uuid
+            0x000D    Varchar
+            0x000E    Varint
+            0x000F    Timeuuid
+            0x0010    Inet
+            0x0020    List: the value is an [option], representing the type
+                            of the elements of the list.
+            0x0021    Map: the value is two [option], representing the types of the
+                           keys and values of the map
+            0x0022    Set: the value is an [option], representing the type
+                            of the elements of the set
+            0x0030    UDT: the value is <ks><udt_name><n><name_1><type_1>...<name_n><type_n>
+                           where:
+                              - <ks> is a [string] representing the keyspace name this
+                                UDT is part of.
+                              - <udt_name> is a [string] representing the UDT name.
+                              - <n> is a [short] reprensenting the number of fields of
+                                the UDT, and thus the number of <name_i><type_i> pair
+                                following
+                              - <name_i> is a [string] representing the name of the
+                                i_th field of the UDT.
+                              - <type_i> is an [option] representing the type of the
+                                i_th field of the UDT.
+
+    - <rows_count> is an [int] representing the number of rows present in this
+      result. Those rows are serialized in the <rows_content> part.
+    - <rows_content> is composed of <row_1>...<row_m> where m is <rows_count>.
+      Each <row_i> is composed of <value_1>...<value_n> where n is
+      <columns_count> and where <value_j> is a [bytes] representing the value
+      returned for the jth column of the ith row. In other words, <rows_content>
+      is composed of (<rows_count> * <columns_count>) [bytes].
+
+
+4.2.5.3. Set_keyspace
+
+  The result to a `use` query. The body (after the kind [int]) is a single
+  [string] indicating the name of the keyspace that has been set.
+
+
+4.2.5.4. Prepared
+
+  The result to a PREPARE message. The rest of the body of a Prepared result is:
+    <id><metadata><result_metadata>
+  where:
+    - <id> is [short bytes] representing the prepared query ID.
+    - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2; you
+      can however assume that the Has_more_pages flag is always off) and
+      is the specification for the variable bound in this prepare statement.
+    - <result_metadata> is defined exactly as <metadata> but correspond to the
+      metadata for the resultSet that execute this query will yield. Note that
+      <result_metadata> may be empty (have the No_metadata flag and 0 columns, See
+      section 4.2.5.2) and will be for any query that is not a Select. There is
+      in fact never a guarantee that this will non-empty so client should protect
+      themselves accordingly. The presence of this information is an
+      optimization that allows to later execute the statement that has been
+      prepared without requesting the metadata (Skip_metadata flag in EXECUTE).
+      Clients can safely discard this metadata if they do not want to take
+      advantage of that optimization.
+
+  Note that prepared query ID return is global to the node on which the query
+  has been prepared. It can be used on any connection to that node and this
+  until the node is restarted (after which the query must be reprepared).
+
+4.2.5.5. Schema_change
+
+  The result to a schema altering query (creation/update/drop of a
+  keyspace/table/index). The body (after the kind [int]) is composed of 3
+  [string]:
+    <change><keyspace><table>
+  where:
+    - <change> describe the type of change that has occured. It can be one of
+      "CREATED", "UPDATED" or "DROPPED".
+    - <keyspace> is the name of the affected keyspace or the keyspace of the
+      affected table.
+    - <table> is the name of the affected table. <table> will be empty (i.e.
+      the empty string "") if the change was affecting a keyspace and not a
+      table.
+
+  Note that queries to create and drop an index are considered as change
+  updating the table the index is on.
+
+
+4.2.6. EVENT
+
+  And event pushed by the server. A client will only receive events for the
+  type it has REGISTER to. The body of an EVENT message will start by a
+  [string] representing the event type. The rest of the message depends on the
+  event type. The valid event types are:
+    - "TOPOLOGY_CHANGE": events related to change in the cluster topology.
+      Currently, events are sent when new nodes are added to the cluster, and
+      when nodes are removed. The body of the message (after the event type)
+      consists of a [string] and an [inet], corresponding respectively to the
+      type of change ("NEW_NODE" or "REMOVED_NODE") followed by the address of
+      the new/removed node.
+    - "STATUS_CHANGE": events related to change of node status. Currently,
+      up/down events are sent. The body of the message (after the event type)
+      consists of a [string] and an [inet], corresponding respectively to the
+      type of status change ("UP" or "DOWN") followed by the address of the
+      concerned node.
+    - "SCHEMA_CHANGE": events related to schema change. After the event type,
+      the rest of the message will be <change_type><target><options> where:
+        - <change_type> is the type of changed involved. It will be one of
+          "CREATED", "UPDATED" or "DROPPED".
+        - <target> can be one of "KEYSPACE", "TABLE" or "TYPE" and describes
+          what has been modified ("TYPE" stands for modifications related to
+          user types).
+        - <options> depends on the preceding <target>. If <target> is
+          "KEYSPACE", then <options> will be a single [string] representing the
+          keyspace changed. Otherwise, if <target> is "TABLE" or "TYPE", then
+          <options> will be 2 [string]: the first one will be the keyspace
+          containing the affected object, and the second one will be the name
+          of said affected object (so either the table name or the user type
+          name).
+
+  All EVENT message have a streamId of -1 (Section 2.3).
+
+  Please note that "NEW_NODE" and "UP" events are sent based on internal Gossip
+  communication and as such may be sent a short delay before the binary
+  protocol server on the newly up node is fully started. Clients are thus
+  advise to wait a short time before trying to connect to the node (1 seconds
+  should be enough), otherwise they may experience a connection refusal at
+  first.
+
+4.2.7. AUTH_CHALLENGE
+
+  A server authentication challenge (see AUTH_RESPONSE (Section 4.1.2) for more
+  details).
+
+  The body of this message is a single [bytes] token. The details of what this
+  token contains (and when it can be null/empty, if ever) depends on the actual
+  authenticator used.
+
+  Clients are expected to answer the server challenge by an AUTH_RESPONSE
+  message.
+
+4.2.7. AUTH_SUCCESS
+
+  Indicate the success of the authentication phase. See Section 4.2.3 for more
+  details.
+
+  The body of this message is a single [bytes] token holding final information
+  from the server that the client may require to finish the authentication
+  process. What that token contains and whether it can be null depends on the
+  actual authenticator used.
+
+
+5. Compression
+
+  Frame compression is supported by the protocol, but then only the frame body
+  is compressed (the frame header should never be compressed).
+
+  Before being used, client and server must agree on a compression algorithm to
+  use, which is done in the STARTUP message. As a consequence, a STARTUP message
+  must never be compressed.  However, once the STARTUP frame has been received
+  by the server can be compressed (including the response to the STARTUP
+  request). Frame do not have to be compressed however, even if compression has
+  been agreed upon (a server may only compress frame above a certain size at its
+  discretion). A frame body should be compressed if and only if the compressed
+  flag (see Section 2.2) is set.
+
+  As of this version 2 of the protocol, the following compressions are available:
+    - lz4 (https://code.google.com/p/lz4/). In that, note that the 4 first bytes
+      of the body will be the uncompressed length (followed by the compressed
+      bytes).
+    - snappy (https://code.google.com/p/snappy/). This compression might not be
+      available as it depends on a native lib (server-side) that might not be
+      avaivable on some installation.
+
+
+6. Collection types
+
+  This section describe the serialization format for the collection types:
+  list, map and set. This serialization format is both useful to decode values
+  returned in RESULT messages but also to encode values for EXECUTE ones.
+
+  The serialization formats are:
+     List: a [int] n indicating the size of the list, followed by n elements.
+           Each element is [bytes] representing the serialized element
+           value.
+     Map: a [int] n indicating the size of the map, followed by n entries.
+          Each entry is composed of two [bytes] representing the key and
+          the value of the entry map.
+     Set: a [int] n indicating the size of the set, followed by n elements.
+          Each element is [bytes] representing the serialized element
+          value.
+
+
+7. User defined types
+
+  This section describe the serialization format for User defined types (UDT) values.
+  UDT values are the values of the User Defined Types as defined in section 4.2.5.2.
+
+  A UDT value is a [short] n indicating the number of values (field) of UDT values
+  followed by n elements. Each element is a [short bytes] representing the serialized
+  field.
+
+
+8. Result paging
+
+  The protocol allows for paging the result of queries. For that, the QUERY and
+  EXECUTE messages have a <result_page_size> value that indicate the desired
+  page size in CQL3 rows.
+
+  If a positive value is provided for <result_page_size>, the result set of the
+  RESULT message returned for the query will contain at most the
+  <result_page_size> first rows of the query result. If that first page of result
+  contains the full result set for the query, the RESULT message (of kind `Rows`)
+  will have the Has_more_pages flag *not* set. However, if some results are not
+  part of the first response, the Has_more_pages flag will be set and the result
+  will contain a <paging_state> value. In that case, the <paging_state> value
+  should be used in a QUERY or EXECUTE message (that has the *same* query than
+  the original one or the behavior is undefined) to retrieve the next page of
+  results.
+
+  Only CQL3 queries that return a result set (RESULT message with a Rows `kind`)
+  support paging. For other type of queries, the <result_page_size> value is
+  ignored.
+
+  Note to client implementors:
+  - While <result_page_size> can be as low as 1, it will likely be detrimental
+    to performance to pick a value too low. A value below 100 is probably too
+    low for most use cases.
+  - Clients should not rely on the actual size of the result set returned to
+    decide if there is more result to fetch or not. Instead, they should always
+    check the Has_more_pages flag (unless they did not enabled paging for the query
+    obviously). Clients should also not assert that no result will have more than
+    <result_page_size> results. While the current implementation always respect
+    the exact value of <result_page_size>, we reserve ourselves the right to return
+    slightly smaller or bigger pages in the future for performance reasons.
+
+
+9. Error codes
+
+  The supported error codes are described below:
+    0x0000    Server error: something unexpected happened. This indicates a
+              server-side bug.
+    0x000A    Protocol error: some client message triggered a protocol
+              violation (for instance a QUERY message is sent before a STARTUP
+              one has been sent)
+    0x0100    Bad credentials: CREDENTIALS request failed because Cassandra
+              did not accept the provided credentials.
+
+    0x1000    Unavailable exception. The rest of the ERROR message body will be
+                <cl><required><alive>
+              where:
+                <cl> is the [consistency] level of the query having triggered
+                     the exception.
+                <required> is an [int] representing the number of node that
+                           should be alive to respect <cl>
+                <alive> is an [int] representing the number of replica that
+                        were known to be alive when the request has been
+                        processed (since an unavailable exception has been
+                        triggered, there will be <alive> < <required>)
+    0x1001    Overloaded: the request cannot be processed because the
+              coordinator node is overloaded
+    0x1002    Is_bootstrapping: the request was a read request but the
+              coordinator node is bootstrapping
+    0x1003    Truncate_error: error during a truncation error.
+    0x1100    Write_timeout: Timeout exception during a write request. The rest
+              of the ERROR message body will be
+                <cl><received><blockfor><writeType>
+              where:
+                <cl> is the [consistency] level of the query having triggered
+                     the exception.
+                <received> is an [int] representing the number of nodes having
+                           acknowledged the request.
+                <blockfor> is the number of replica whose acknowledgement is
+                           required to achieve <cl>.
+                <writeType> is a [string] that describe the type of the write
+                            that timeouted. The value of that string can be one
+                            of:
+                             - "SIMPLE": the write was a non-batched
+                               non-counter write.
+                             - "BATCH": the write was a (logged) batch write.
+                               If this type is received, it means the batch log
+                               has been successfully written (otherwise a
+                               "BATCH_LOG" type would have been send instead).
+                             - "UNLOGGED_BATCH": the write was an unlogged
+                               batch. Not batch log write has been attempted.
+                             - "COUNTER": the write was a counter write
+                               (batched or not).
+                             - "BATCH_LOG": the timeout occured during the
+                               write to the batch log when a (logged) batch
+                               write was requested.
+    0x1200    Read_timeout: Timeout exception during a read request. The rest
+              of the ERROR message body will be
+                <cl><received><blockfor><data_present>
+              where:
+                <cl> is the [consistency] level of the query having triggered
+                     the exception.
+                <received> is an [int] representing the number of nodes having
+                           answered the request.
+                <blockfor> is the number of replica whose response is
+                           required to achieve <cl>. Please note that it is
+                           possible to have <received> >= <blockfor> if
+                           <data_present> is false. And also in the (unlikely)
+                           case were <cl> is achieved but the coordinator node
+                           timeout while waiting for read-repair
+                           acknowledgement.
+                <data_present> is a single byte. If its value is 0, it means
+                               the replica that was asked for data has not
+                               responded. Otherwise, the value is != 0.
+
+    0x2000    Syntax_error: The submitted query has a syntax error.
+    0x2100    Unauthorized: The logged user doesn't have the right to perform
+              the query.
+    0x2200    Invalid: The query is syntactically correct but invalid.
+    0x2300    Config_error: The query is invalid because of some configuration issue
+    0x2400    Already_exists: The query attempted to create a keyspace or a
+              table that was already existing. The rest of the ERROR message
+              body will be <ks><table> where:
+                <ks> is a [string] representing either the keyspace that
+                     already exists, or the keyspace in which the table that
+                     already exists is.
+                <table> is a [string] representing the name of the table that
+                        already exists. If the query was attempting to create a
+                        keyspace, <table> will be present but will be the empty
+                        string.
+    0x2500    Unprepared: Can be thrown while a prepared statement tries to be
+              executed if the provide prepared statement ID is not known by
+              this host. The rest of the ERROR message body will be [short
+              bytes] representing the unknown ID.
+
+10. Changes from v2
+  * BATCH messages now have <flags> (like QUERY and EXECUTE) and a corresponding optional
+    <serial_consistency> parameters (see Section 4.1.7).
+  * User Defined Types have to added to ResultSet metadata (see 4.2.5.2) and a new section
+    on the serialization format of UDT values has been added to the documentation
+    (Section 7).
+  * The serialization format for collection has changed (both the collection size and
+    the length of each argument is now 4 bytes long). See Section 6.
+  * QUERY, EXECUTE and BATCH messages can now optionally provide the default timestamp for the query.
+    As this feature is optionally enabled by clients, implementing it is at the discretion of the
+    client.
+  * QUERY, EXECUTE and BATCH messages can now optionally provide the names for the values of the
+    query. As this feature is optionally enabled by clients, implementing it is at the discretion of the
+    client.
+  * The format of "SCHEMA_CHANGE" notifications has been modified, and now includes changes related to
+    user types.
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index 237fc99..528a54a 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -258,8 +258,8 @@ public class Auth
         try
         {
             ResultMessage.Rows rows = selectUserStatement.execute(QueryState.forInternalCalls(),
-                                                                  new QueryOptions(consistencyForUser(username),
-                                                                                   Lists.newArrayList(ByteBufferUtil.bytes(username))));
+                                                                  QueryOptions.forInternalCalls(consistencyForUser(username),
+                                                                                                Lists.newArrayList(ByteBufferUtil.bytes(username))));
             return UntypedResultSet.create(rows.result);
         }
         catch (RequestValidationException e)
@@ -287,6 +287,10 @@ public class Auth
             DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.columnFamily(ksName, cfName));
         }
 
+        public void onDropUserType(String ksName, String userType)
+        {
+        }
+
         public void onCreateKeyspace(String ksName)
         {
         }
@@ -295,6 +299,10 @@ public class Auth
         {
         }
 
+        public void onCreateUserType(String ksName, String userType)
+        {
+        }
+
         public void onUpdateKeyspace(String ksName)
         {
         }
@@ -302,5 +310,9 @@ public class Auth
         public void onUpdateColumnFamily(String ksName, String cfName)
         {
         }
+
+        public void onUpdateUserType(String ksName, String userType)
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index 85d2b16..b37dee2 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -71,9 +71,9 @@ public class CassandraAuthorizer implements IAuthorizer
         try
         {
             ResultMessage.Rows rows = authorizeStatement.execute(QueryState.forInternalCalls(),
-                                                                 new QueryOptions(ConsistencyLevel.ONE,
-                                                                                  Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
-                                                                                                     ByteBufferUtil.bytes(resource.getName()))));
+                                                                 QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
+                                                                                               Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
+                                                                                                                  ByteBufferUtil.bytes(resource.getName()))));
             result = UntypedResultSet.create(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 1567bde..9256c2b 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -106,8 +106,8 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator
         try
         {
             ResultMessage.Rows rows = authenticateStatement.execute(QueryState.forInternalCalls(),
-                                                                    new QueryOptions(consistencyForUser(username),
-                                                                                     Lists.newArrayList(ByteBufferUtil.bytes(username))));
+                                                                    QueryOptions.forInternalCalls(consistencyForUser(username),
+                                                                                                  Lists.newArrayList(ByteBufferUtil.bytes(username))));
             result = UntypedResultSet.create(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java
index 7c33e5b..435757b 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -56,12 +56,12 @@ public class Attributes
         return timeToLive != null;
     }
 
-    public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
+    public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
     {
         if (timestamp == null)
             return now;
 
-        ByteBuffer tval = timestamp.bindAndGet(variables);
+        ByteBuffer tval = timestamp.bindAndGet(options);
         if (tval == null)
             throw new InvalidRequestException("Invalid null value of timestamp");
 
@@ -77,12 +77,12 @@ public class Attributes
         return LongType.instance.compose(tval);
     }
 
-    public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
+    public int getTimeToLive(QueryOptions options) throws InvalidRequestException
     {
         if (timeToLive == null)
             return 0;
 
-        ByteBuffer tval = timeToLive.bindAndGet(variables);
+        ByteBuffer tval = timeToLive.bindAndGet(options);
         if (tval == null)
             throw new InvalidRequestException("Invalid null value of TTL");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
index cbf5e92..2bb8071 100644
--- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
@@ -18,38 +18,95 @@
 package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.PagingState;
 
-/**
- * Options for a batch (at the protocol level) queries.
- */
-public class BatchQueryOptions
+public abstract class BatchQueryOptions
 {
-    private final ConsistencyLevel consistency;
-    private final List<List<ByteBuffer>> values;
+    public static BatchQueryOptions DEFAULT = withoutPerStatementVariables(QueryOptions.DEFAULT);
+
+    protected final QueryOptions wrapped;
     private final List<Object> queryOrIdList;
 
-    public BatchQueryOptions(ConsistencyLevel cl, List<List<ByteBuffer>> values, List<Object> queryOrIdList)
+    protected BatchQueryOptions(QueryOptions wrapped, List<Object> queryOrIdList)
     {
-        this.consistency = cl;
-        this.values = values;
+        this.wrapped = wrapped;
         this.queryOrIdList = queryOrIdList;
     }
 
+    public static BatchQueryOptions withoutPerStatementVariables(QueryOptions options)
+    {
+        return new WithoutPerStatementVariables(options, Collections.<Object>emptyList());
+    }
+
+    public static BatchQueryOptions withPerStatementVariables(QueryOptions options, List<List<ByteBuffer>> variables, List<Object> queryOrIdList)
+    {
+        return new WithPerStatementVariables(options, variables, queryOrIdList);
+    }
+
+    public abstract QueryOptions forStatement(int i);
+
     public ConsistencyLevel getConsistency()
     {
-        return consistency;
+        return wrapped.getConsistency();
     }
 
-    public List<List<ByteBuffer>> getValues()
+    public ConsistencyLevel getSerialConsistency()
     {
-        return values;
+        return wrapped.getSerialConsistency();
     }
 
     public List<Object> getQueryOrIdList()
     {
         return queryOrIdList;
     }
+
+    public long getTimestamp(QueryState state)
+    {
+        return wrapped.getTimestamp(state);
+    }
+
+    private static class WithoutPerStatementVariables extends BatchQueryOptions
+    {
+        private WithoutPerStatementVariables(QueryOptions wrapped, List<Object> queryOrIdList)
+        {
+            super(wrapped, queryOrIdList);
+        }
+
+        public QueryOptions forStatement(int i)
+        {
+            return wrapped;
+        }
+    }
+
+    private static class WithPerStatementVariables extends BatchQueryOptions
+    {
+        private final List<QueryOptions> perStatementOptions;
+
+        private WithPerStatementVariables(QueryOptions wrapped, List<List<ByteBuffer>> variables, List<Object> queryOrIdList)
+        {
+            super(wrapped, queryOrIdList);
+            this.perStatementOptions = new ArrayList<>(variables.size());
+            for (final List<ByteBuffer> vars : variables)
+            {
+                perStatementOptions.add(new QueryOptions.QueryOptionsWrapper(wrapped)
+                {
+                    public List<ByteBuffer> getValues()
+                    {
+                        return vars;
+                    }
+                });
+            }
+        }
+
+        public QueryOptions forStatement(int i)
+        {
+            return perStatementOptions.get(i);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/ColumnCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnCondition.java b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
index 67e7174..c2617fe 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnCondition.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
@@ -74,21 +74,21 @@ public class ColumnCondition
         value.collectMarkerSpecification(boundNames);
     }
 
-    public ColumnCondition.WithVariables with(List<ByteBuffer> variables)
+    public ColumnCondition.WithOptions with(QueryOptions options)
     {
-        return new WithVariables(variables);
+        return new WithOptions(options);
     }
 
-    public class WithVariables
+    public class WithOptions
     {
-        private final List<ByteBuffer> variables;
+        private final QueryOptions options;
 
-        private WithVariables(List<ByteBuffer> variables)
+        private WithOptions(QueryOptions options)
         {
-            this.variables = variables;
+            this.options = options;
         }
 
-        public boolean equalsTo(WithVariables other) throws InvalidRequestException
+        public boolean equalsTo(WithOptions other) throws InvalidRequestException
         {
             if (!column().equals(other.column()))
                 return false;
@@ -103,11 +103,11 @@ public class ColumnCondition
                                            ? Int32Type.instance
                                            : ((MapType)column.type).keys;
 
-                if (comparator.compare(collectionElement().bindAndGet(variables), other.collectionElement().bindAndGet(variables)) != 0)
+                if (comparator.compare(collectionElement().bindAndGet(options), other.collectionElement().bindAndGet(options)) != 0)
                     return false;
             }
 
-            return value().bindAndGet(variables).equals(other.value().bindAndGet(other.variables));
+            return value().bindAndGet(options).equals(other.value().bindAndGet(other.options));
         }
 
         private ColumnDefinition column()
@@ -127,7 +127,7 @@ public class ColumnCondition
 
         public ByteBuffer getCollectionElementValue() throws InvalidRequestException
         {
-            return collectionElement == null ? null : collectionElement.bindAndGet(variables);
+            return collectionElement == null ? null : collectionElement.bindAndGet(options);
         }
 
         /**
@@ -140,7 +140,7 @@ public class ColumnCondition
 
             assert collectionElement == null;
             Cell c = current.getColumn(current.metadata().comparator.create(rowPrefix, column));
-            ByteBuffer v = value.bindAndGet(variables);
+            ByteBuffer v = value.bindAndGet(options);
             return v == null
                  ? c == null || !c.isLive(now)
                  : c != null && c.isLive(now) && c.value().equals(v);
@@ -148,15 +148,15 @@ public class ColumnCondition
 
         private boolean collectionAppliesTo(CollectionType type, Composite rowPrefix, ColumnFamily current, final long now) throws InvalidRequestException
         {
-            Term.Terminal v = value.bind(variables);
+            Term.Terminal v = value.bind(options);
 
             // For map element access, we won't iterate over the collection, so deal with that first. In other case, we do.
             if (collectionElement != null && type instanceof MapType)
             {
-                ByteBuffer e = collectionElement.bindAndGet(variables);
+                ByteBuffer e = collectionElement.bindAndGet(options);
                 if (e == null)
                     throw new InvalidRequestException("Invalid null value for map access");
-                return mapElementAppliesTo((MapType)type, current, rowPrefix, e, v.get(), now);
+                return mapElementAppliesTo((MapType)type, current, rowPrefix, e, v.get(options), now);
             }
 
             CellName name = current.metadata().comparator.create(rowPrefix, column);
@@ -178,11 +178,11 @@ public class ColumnCondition
             if (collectionElement != null)
             {
                 assert type instanceof ListType;
-                ByteBuffer e = collectionElement.bindAndGet(variables);
+                ByteBuffer e = collectionElement.bindAndGet(options);
                 if (e == null)
                     throw new InvalidRequestException("Invalid null value for list access");
 
-                return listElementAppliesTo((ListType)type, iter, e, v.get());
+                return listElementAppliesTo((ListType)type, iter, e, v.get(options));
             }
 
             switch (type.kind)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index 3b7b4c4..5af84f0 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -53,7 +53,7 @@ public abstract class Constants
         private final Term.Terminal NULL_VALUE = new Value(null)
         {
             @Override
-            public Terminal bind(List<ByteBuffer> values)
+            public Terminal bind(QueryOptions options)
             {
                 // We return null because that makes life easier for collections
                 return null;
@@ -246,13 +246,13 @@ public abstract class Constants
             this.bytes = bytes;
         }
 
-        public ByteBuffer get()
+        public ByteBuffer get(QueryOptions options)
         {
             return bytes;
         }
 
         @Override
-        public ByteBuffer bindAndGet(List<ByteBuffer> values)
+        public ByteBuffer bindAndGet(QueryOptions options)
         {
             return bytes;
         }
@@ -267,11 +267,11 @@ public abstract class Constants
         }
 
         @Override
-        public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+        public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
         {
             try
             {
-                ByteBuffer value = values.get(bindIndex);
+                ByteBuffer value = options.getValues().get(bindIndex);
                 if (value != null)
                     receiver.type.validate(value);
                 return value;
@@ -282,9 +282,9 @@ public abstract class Constants
             }
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
-            ByteBuffer bytes = bindAndGet(values);
+            ByteBuffer bytes = bindAndGet(options);
             return bytes == null ? null : new Constants.Value(bytes);
         }
     }
@@ -299,7 +299,7 @@ public abstract class Constants
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
             CellName cname = cf.getComparator().create(prefix, column);
-            ByteBuffer value = t.bindAndGet(params.variables);
+            ByteBuffer value = t.bindAndGet(params.options);
             cf.addColumn(value == null ? params.makeTombstone(cname) : params.makeColumn(cname, value));
         }
     }
@@ -313,7 +313,7 @@ public abstract class Constants
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ByteBuffer bytes = t.bindAndGet(params.variables);
+            ByteBuffer bytes = t.bindAndGet(params.options);
             if (bytes == null)
                 throw new InvalidRequestException("Invalid null value for counter increment");
             long increment = ByteBufferUtil.toLong(bytes);
@@ -331,7 +331,7 @@ public abstract class Constants
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ByteBuffer bytes = t.bindAndGet(params.variables);
+            ByteBuffer bytes = t.bindAndGet(params.options);
             if (bytes == null)
                 throw new InvalidRequestException("Invalid null value for counter increment");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index 580a2c9..751ccdb 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -28,10 +28,10 @@ import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -83,7 +83,7 @@ public abstract class Lists
                 values.add(t);
             }
             DelayedValue value = new DelayedValue(values);
-            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+            return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
         }
 
         private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -128,13 +128,13 @@ public abstract class Lists
             this.elements = elements;
         }
 
-        public static Value fromSerialized(ByteBuffer value, ListType type) throws InvalidRequestException
+        public static Value fromSerialized(ByteBuffer value, ListType type, int version) throws InvalidRequestException
         {
             try
             {
                 // Collections have this small hack that validate cannot be called on a serialized object,
                 // but compose does the validation (so we're fine).
-                List<?> l = (List<?>)type.compose(value);
+                List<?> l = (List<?>)type.getSerializer().deserializeForNativeProtocol(value, version);
                 List<ByteBuffer> elements = new ArrayList<ByteBuffer>(l.size());
                 for (Object element : l)
                     elements.add(type.elements.decompose(element));
@@ -146,9 +146,9 @@ public abstract class Lists
             }
         }
 
-        public ByteBuffer get()
+        public ByteBuffer get(QueryOptions options)
         {
-            return CollectionType.pack(elements, elements.size());
+            return CollectionSerializer.pack(elements, elements.size(), options.getProtocolVersion());
         }
     }
 
@@ -180,12 +180,12 @@ public abstract class Lists
         {
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
             List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(elements.size());
             for (Term t : elements)
             {
-                ByteBuffer bytes = t.bindAndGet(values);
+                ByteBuffer bytes = t.bindAndGet(options);
 
                 if (bytes == null)
                     throw new InvalidRequestException("null is not supported inside collections");
@@ -210,10 +210,10 @@ public abstract class Lists
             assert receiver.type instanceof ListType;
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
-            ByteBuffer value = values.get(bindIndex);
-            return value == null ? null : Value.fromSerialized(value, (ListType)receiver.type);
+            ByteBuffer value = options.getValues().get(bindIndex);
+            return value == null ? null : Value.fromSerialized(value, (ListType)receiver.type, options.getProtocolVersion());
 
         }
     }
@@ -299,8 +299,8 @@ public abstract class Lists
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ByteBuffer index = idx.bindAndGet(params.variables);
-            ByteBuffer value = t.bindAndGet(params.variables);
+            ByteBuffer index = idx.bindAndGet(params.options);
+            ByteBuffer value = t.bindAndGet(params.options);
 
             if (index == null)
                 throw new InvalidRequestException("Invalid null value for list index");
@@ -342,7 +342,7 @@ public abstract class Lists
 
         static void doAppend(Term t, ColumnFamily cf, Composite prefix, ColumnDefinition column, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             // If we append null, do nothing. Note that for Setter, we've
             // already removed the previous value so we're good here too
             if (value == null)
@@ -367,7 +367,7 @@ public abstract class Lists
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             if (value == null)
                 return;
 
@@ -403,7 +403,7 @@ public abstract class Lists
             if (existingList.isEmpty())
                 return;
 
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             if (value == null)
                 return;
 
@@ -437,7 +437,7 @@ public abstract class Lists
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal index = t.bind(params.variables);
+            Term.Terminal index = t.bind(params.options);
             if (index == null)
                 throw new InvalidRequestException("Invalid null value for list index");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Maps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java
index d113b57..0c4980c 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -31,9 +31,9 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -86,7 +86,7 @@ public abstract class Maps
                 values.put(k, v);
             }
             DelayedValue value = new DelayedValue(((MapType)receiver.type).keys, values);
-            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+            return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
         }
 
         private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -142,13 +142,13 @@ public abstract class Maps
             this.map = map;
         }
 
-        public static Value fromSerialized(ByteBuffer value, MapType type) throws InvalidRequestException
+        public static Value fromSerialized(ByteBuffer value, MapType type, int version) throws InvalidRequestException
         {
             try
             {
                 // Collections have this small hack that validate cannot be called on a serialized object,
                 // but compose does the validation (so we're fine).
-                Map<?, ?> m = (Map<?, ?>)type.compose(value);
+                Map<?, ?> m = (Map<?, ?>)type.getSerializer().deserializeForNativeProtocol(value, version);
                 Map<ByteBuffer, ByteBuffer> map = new LinkedHashMap<ByteBuffer, ByteBuffer>(m.size());
                 for (Map.Entry<?, ?> entry : m.entrySet())
                     map.put(type.keys.decompose(entry.getKey()), type.values.decompose(entry.getValue()));
@@ -160,7 +160,7 @@ public abstract class Maps
             }
         }
 
-        public ByteBuffer get()
+        public ByteBuffer get(QueryOptions options)
         {
             List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(2 * map.size());
             for (Map.Entry<ByteBuffer, ByteBuffer> entry : map.entrySet())
@@ -168,7 +168,7 @@ public abstract class Maps
                 buffers.add(entry.getKey());
                 buffers.add(entry.getValue());
             }
-            return CollectionType.pack(buffers, map.size());
+            return CollectionSerializer.pack(buffers, map.size(), options.getProtocolVersion());
         }
     }
 
@@ -194,13 +194,13 @@ public abstract class Maps
         {
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
             Map<ByteBuffer, ByteBuffer> buffers = new TreeMap<ByteBuffer, ByteBuffer>(comparator);
             for (Map.Entry<Term, Term> entry : elements.entrySet())
             {
                 // We don't support values > 64K because the serialization format encode the length as an unsigned short.
-                ByteBuffer keyBytes = entry.getKey().bindAndGet(values);
+                ByteBuffer keyBytes = entry.getKey().bindAndGet(options);
                 if (keyBytes == null)
                     throw new InvalidRequestException("null is not supported inside collections");
                 if (keyBytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -208,7 +208,7 @@ public abstract class Maps
                                                                     FBUtilities.MAX_UNSIGNED_SHORT,
                                                                     keyBytes.remaining()));
 
-                ByteBuffer valueBytes = entry.getValue().bindAndGet(values);
+                ByteBuffer valueBytes = entry.getValue().bindAndGet(options);
                 if (valueBytes == null)
                     throw new InvalidRequestException("null is not supported inside collections");
                 if (valueBytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -230,10 +230,10 @@ public abstract class Maps
             assert receiver.type instanceof MapType;
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
-            ByteBuffer value = values.get(bindIndex);
-            return value == null ? null : Value.fromSerialized(value, (MapType)receiver.type);
+            ByteBuffer value = options.getValues().get(bindIndex);
+            return value == null ? null : Value.fromSerialized(value, (MapType)receiver.type, options.getProtocolVersion());
         }
     }
 
@@ -272,8 +272,8 @@ public abstract class Maps
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ByteBuffer key = k.bindAndGet(params.variables);
-            ByteBuffer value = t.bindAndGet(params.variables);
+            ByteBuffer key = k.bindAndGet(params.options);
+            ByteBuffer value = t.bindAndGet(params.options);
             if (key == null)
                 throw new InvalidRequestException("Invalid null map key");
 
@@ -310,7 +310,7 @@ public abstract class Maps
 
         static void doPut(Term t, ColumnFamily cf, Composite prefix, ColumnDefinition column, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             if (value == null)
                 return;
             assert value instanceof Maps.Value;
@@ -333,7 +333,7 @@ public abstract class Maps
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal key = t.bind(params.variables);
+            Term.Terminal key = t.bind(params.options);
             if (key == null)
                 throw new InvalidRequestException("Invalid null map key");
             assert key instanceof Constants.Value;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/QueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java
index 4d72333..2f28812 100644
--- a/src/java/org/apache/cassandra/cql3/QueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3;
 
 import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.QueryState;
@@ -28,7 +29,7 @@ public interface QueryHandler
 {
     public ResultMessage process(String query, QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException;
     public ResultMessage.Prepared prepare(String query, QueryState state) throws RequestValidationException;
-    public CQLStatement getPrepared(MD5Digest id);
+    public ParsedStatement.Prepared getPrepared(MD5Digest id);
     public CQLStatement getPreparedForThrift(Integer id);
     public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException;
     public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException;