You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/30 20:23:02 UTC

[1/5] Native protocol v3

Repository: cassandra
Updated Branches:
  refs/heads/trunk ad3424707 -> 6b4d98035


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index ef30a22..ec96ed1 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
@@ -61,8 +62,11 @@ public class BatchMessage extends Message.Request
                     throw new ProtocolException("Invalid query kind in BATCH messages. Must be 0 or 1 but got " + kind);
                 variables.add(CBUtil.readValueList(body));
             }
-            ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
-            return new BatchMessage(toType(type), queryOrIds, variables, consistency);
+            QueryOptions options = version < 3
+                                 ? QueryOptions.fromPreV3Batch(CBUtil.readConsistencyLevel(body))
+                                 : QueryOptions.codec.decode(body, version);
+
+            return new BatchMessage(toType(type), queryOrIds, variables, options);
         }
 
         public void encode(BatchMessage msg, ByteBuf dest, int version)
@@ -84,7 +88,10 @@ public class BatchMessage extends Message.Request
                 CBUtil.writeValueList(msg.values.get(i), dest);
             }
 
-            CBUtil.writeConsistencyLevel(msg.consistency, dest);
+            if (version < 3)
+                CBUtil.writeConsistencyLevel(msg.options.getConsistency(), dest);
+            else
+                QueryOptions.codec.encode(msg.options, dest, version);
         }
 
         public int encodedSize(BatchMessage msg, int version)
@@ -99,7 +106,9 @@ public class BatchMessage extends Message.Request
 
                 size += CBUtil.sizeOfValueList(msg.values.get(i));
             }
-            size += CBUtil.sizeOfConsistencyLevel(msg.consistency);
+            size += version < 3
+                  ? CBUtil.sizeOfConsistencyLevel(msg.options.getConsistency())
+                  : QueryOptions.codec.encodedSize(msg.options, version);
             return size;
         }
 
@@ -131,15 +140,15 @@ public class BatchMessage extends Message.Request
     public final BatchStatement.Type type;
     public final List<Object> queryOrIdList;
     public final List<List<ByteBuffer>> values;
-    public final ConsistencyLevel consistency;
+    public final QueryOptions options;
 
-    public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList, List<List<ByteBuffer>> values, ConsistencyLevel consistency)
+    public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList, List<List<ByteBuffer>> values, QueryOptions options)
     {
         super(Message.Type.BATCH);
         this.type = type;
         this.queryOrIdList = queryOrIdList;
         this.values = values;
-        this.consistency = consistency;
+        this.options = options;
     }
 
     public Message.Response execute(QueryState state)
@@ -161,27 +170,39 @@ public class BatchMessage extends Message.Request
             }
 
             QueryHandler handler = state.getClientState().getCQLQueryHandler();
-            List<ModificationStatement> statements = new ArrayList<ModificationStatement>(queryOrIdList.size());
+            List<ParsedStatement.Prepared> prepared = new ArrayList<>(queryOrIdList.size());
             for (int i = 0; i < queryOrIdList.size(); i++)
             {
                 Object query = queryOrIdList.get(i);
-                CQLStatement statement;
+                ParsedStatement.Prepared p;
                 if (query instanceof String)
                 {
-                    statement = QueryProcessor.parseStatement((String)query, state);
+                    p = QueryProcessor.parseStatement((String)query, state);
                 }
                 else
                 {
-                    statement = handler.getPrepared((MD5Digest)query);
-                    if (statement == null)
+                    p = handler.getPrepared((MD5Digest)query);
+                    if (p == null)
                         throw new PreparedQueryNotFoundException((MD5Digest)query);
                 }
 
                 List<ByteBuffer> queryValues = values.get(i);
-                if (queryValues.size() != statement.getBoundTerms())
+                if (queryValues.size() != p.statement.getBoundTerms())
                     throw new InvalidRequestException(String.format("There were %d markers(?) in CQL but %d bound variables",
-                                                                    statement.getBoundTerms(),
+                                                                    p.statement.getBoundTerms(),
                                                                     queryValues.size()));
+
+                prepared.add(p);
+            }
+
+            BatchQueryOptions batchOptions = BatchQueryOptions.withPerStatementVariables(options, values, queryOrIdList);
+            List<ModificationStatement> statements = new ArrayList<>(prepared.size());
+            for (int i = 0; i < prepared.size(); i++)
+            {
+                ParsedStatement.Prepared p = prepared.get(i);
+                batchOptions.forStatement(i).prepare(p.boundNames);
+                CQLStatement statement = p.statement;
+
                 if (!(statement instanceof ModificationStatement))
                     throw new InvalidRequestException("Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed.");
 
@@ -202,7 +223,7 @@ public class BatchMessage extends Message.Request
             // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
             // (and no value would be really correct, so we prefer passing a clearly wrong one).
             BatchStatement batch = new BatchStatement(-1, type, statements, Attributes.none());
-            Message.Response response = handler.processBatch(batch, state, new BatchQueryOptions(consistency, values, queryOrIdList));
+            Message.Response response = handler.processBatch(batch, state, batchOptions);
 
             if (tracingId != null)
                 response.setTracingId(tracingId);
@@ -229,7 +250,7 @@ public class BatchMessage extends Message.Request
             if (i > 0) sb.append(", ");
             sb.append(queryOrIdList.get(i)).append(" with ").append(values.get(i).size()).append(" values");
         }
-        sb.append("] at consistency ").append(consistency);
+        sb.append("] at consistency ").append(options.getConsistency());
         return sb.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
index 890b9d1..f3ab526 100644
--- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -28,17 +28,17 @@ public class EventMessage extends Message.Response
     {
         public EventMessage decode(ByteBuf body, int version)
         {
-            return new EventMessage(Event.deserialize(body));
+            return new EventMessage(Event.deserialize(body, version));
         }
 
         public void encode(EventMessage msg, ByteBuf dest, int version)
         {
-            msg.event.serialize(dest);
+            msg.event.serialize(dest, version);
         }
 
         public int encodedSize(EventMessage msg, int version)
         {
-            return msg.event.serializedSize();
+            return msg.event.serializedSize(version);
         }
     };
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index caec43f..d618f43 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -27,6 +27,7 @@ import io.netty.buffer.ByteBuf;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.QueryState;
@@ -100,7 +101,9 @@ public class ExecuteMessage extends Message.Request
         try
         {
             QueryHandler handler = state.getClientState().getCQLQueryHandler();
-            CQLStatement statement = handler.getPrepared(statementId);
+            ParsedStatement.Prepared prepared = handler.getPrepared(statementId);
+            options.prepare(prepared.boundNames);
+            CQLStatement statement = prepared.statement;
 
             if (statement == null)
                 throw new PreparedQueryNotFoundException(statementId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
new file mode 100644
index 0000000..9b66efb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.ByteBuf;
+
+import org.junit.Test;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.serializers.CollectionSerializer;
+import org.apache.cassandra.transport.Event.TopologyChange;
+import org.apache.cassandra.transport.Event.SchemaChange;
+import org.apache.cassandra.transport.Event.StatusChange;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+/**
+ * Serialization/deserialization tests for protocol objects and messages.
+ */
+public class SerDeserTest
+{
+    @Test
+    public void collectionSerDeserTest() throws Exception
+    {
+        collectionSerDeserTest(2);
+        collectionSerDeserTest(3);
+    }
+
+    public void collectionSerDeserTest(int version) throws Exception
+    {
+        // Lists
+        ListType<?> lt = ListType.getInstance(Int32Type.instance);
+        List<Integer> l = Arrays.asList(2, 6, 1, 9);
+
+        List<ByteBuffer> lb = new ArrayList<>(l.size());
+        for (Integer i : l)
+            lb.add(Int32Type.instance.decompose(i));
+
+        assertEquals(l, lt.getSerializer().deserializeForNativeProtocol(CollectionSerializer.pack(lb, lb.size(), version), version));
+
+        // Sets
+        SetType<?> st = SetType.getInstance(UTF8Type.instance);
+        Set<String> s = new LinkedHashSet<>();
+        s.addAll(Arrays.asList("bar", "foo", "zee"));
+
+        List<ByteBuffer> sb = new ArrayList<>(s.size());
+        for (String t : s)
+            sb.add(UTF8Type.instance.decompose(t));
+
+        assertEquals(s, st.getSerializer().deserializeForNativeProtocol(CollectionSerializer.pack(sb, sb.size(), version), version));
+
+        // Maps
+        MapType<?, ?> mt = MapType.getInstance(UTF8Type.instance, LongType.instance);
+        Map<String, Long> m = new LinkedHashMap<>();
+        m.put("bar", 12L);
+        m.put("foo", 42L);
+        m.put("zee", 14L);
+
+        List<ByteBuffer> mb = new ArrayList<>(m.size() * 2);
+        for (Map.Entry<String, Long> entry : m.entrySet())
+        {
+            mb.add(UTF8Type.instance.decompose(entry.getKey()));
+            mb.add(LongType.instance.decompose(entry.getValue()));
+        }
+
+        assertEquals(m, mt.getSerializer().deserializeForNativeProtocol(CollectionSerializer.pack(mb, m.size(), version), version));
+    }
+
+    @Test
+    public void eventSerDeserTest() throws Exception
+    {
+        eventSerDeserTest(2);
+        eventSerDeserTest(3);
+    }
+
+    public void eventSerDeserTest(int version) throws Exception
+    {
+        List<Event> events = new ArrayList<>();
+
+        events.add(TopologyChange.newNode(FBUtilities.getBroadcastAddress(), 42));
+        events.add(TopologyChange.removedNode(FBUtilities.getBroadcastAddress(), 42));
+        events.add(TopologyChange.movedNode(FBUtilities.getBroadcastAddress(), 42));
+
+        events.add(StatusChange.nodeUp(FBUtilities.getBroadcastAddress(), 42));
+        events.add(StatusChange.nodeDown(FBUtilities.getBroadcastAddress(), 42));
+
+        events.add(new SchemaChange(SchemaChange.Change.CREATED, "ks"));
+        events.add(new SchemaChange(SchemaChange.Change.UPDATED, "ks"));
+        events.add(new SchemaChange(SchemaChange.Change.DROPPED, "ks"));
+
+        events.add(new SchemaChange(SchemaChange.Change.CREATED, SchemaChange.Target.TABLE, "ks", "table"));
+        events.add(new SchemaChange(SchemaChange.Change.UPDATED, SchemaChange.Target.TABLE, "ks", "table"));
+        events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.TABLE, "ks", "table"));
+
+        if (version >= 3)
+        {
+            events.add(new SchemaChange(SchemaChange.Change.CREATED, SchemaChange.Target.TYPE, "ks", "type"));
+            events.add(new SchemaChange(SchemaChange.Change.UPDATED, SchemaChange.Target.TYPE, "ks", "type"));
+            events.add(new SchemaChange(SchemaChange.Change.DROPPED, SchemaChange.Target.TYPE, "ks", "type"));
+        }
+
+        for (Event ev : events)
+        {
+            ByteBuf buf = Unpooled.buffer(ev.serializedSize(version));
+            ev.serialize(buf, version);
+            assertEquals(ev, Event.deserialize(buf, version));
+        }
+    }
+
+    private static ByteBuffer bb(String str)
+    {
+        return UTF8Type.instance.decompose(str);
+    }
+
+    private static ColumnIdentifier ci(String name)
+    {
+        return new ColumnIdentifier(name, false);
+    }
+
+    private static Constants.Literal lit(long v)
+    {
+        return Constants.Literal.integer(String.valueOf(v));
+    }
+
+    private static Constants.Literal lit(String v)
+    {
+        return Constants.Literal.string(v);
+    }
+
+    private static ColumnSpecification columnSpec(String name, AbstractType<?> type)
+    {
+        return new ColumnSpecification("ks", "cf", ci(name), type);
+    }
+
+    @Test
+    public void udtSerDeserTest() throws Exception
+    {
+        udtSerDeserTest(2);
+        udtSerDeserTest(3);
+    }
+
+    public void udtSerDeserTest(int version) throws Exception
+    {
+        ListType<?> lt = ListType.getInstance(Int32Type.instance);
+        SetType<?> st = SetType.getInstance(UTF8Type.instance);
+        MapType<?, ?> mt = MapType.getInstance(UTF8Type.instance, LongType.instance);
+
+        UserType udt = new UserType("ks",
+                                    bb("myType"),
+                                    Arrays.asList(bb("f1"), bb("f2"), bb("f3"), bb("f4")),
+                                    Arrays.asList(LongType.instance, lt, st, mt));
+
+        Map<ColumnIdentifier, Term.Raw> value = new HashMap<>();
+        value.put(ci("f1"), lit(42));
+        value.put(ci("f2"), new Lists.Literal(Arrays.<Term.Raw>asList(lit(3), lit(1))));
+        value.put(ci("f3"), new Sets.Literal(Arrays.<Term.Raw>asList(lit("foo"), lit("bar"))));
+        value.put(ci("f4"), new Maps.Literal(Arrays.<Pair<Term.Raw, Term.Raw>>asList(
+                                   Pair.<Term.Raw, Term.Raw>create(lit("foo"), lit(24)),
+                                   Pair.<Term.Raw, Term.Raw>create(lit("bar"), lit(12)))));
+
+        UserTypes.Literal u = new UserTypes.Literal(value);
+        Term t = u.prepare("ks", columnSpec("myValue", udt));
+
+        QueryOptions options = QueryOptions.DEFAULT;
+        if (version == 2)
+            options = QueryOptions.fromProtocolV2(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList());
+        else if (version != 3)
+            throw new AssertionError("Invalid protocol version for test");
+
+        ByteBuffer serialized = t.bindAndGet(options);
+
+        ByteBuffer[] fields = udt.split(serialized);
+
+        assertEquals(4, fields.length);
+
+        assertEquals(bytes(42L), fields[0]);
+
+        // Note that no matter what the protocol version has been used in bindAndGet above, the collections inside
+        // a UDT should alway be serialized with version 3 of the protocol. Which is why we don't use 'version'
+        // on purpose below.
+
+        assertEquals(Arrays.asList(3, 1), lt.getSerializer().deserializeForNativeProtocol(fields[1], 3));
+
+        LinkedHashSet<String> s = new LinkedHashSet<>();
+        s.addAll(Arrays.asList("bar", "foo"));
+        assertEquals(s, st.getSerializer().deserializeForNativeProtocol(fields[2], 3));
+
+        LinkedHashMap<String, Long> m = new LinkedHashMap<>();
+        m.put("bar", 12L);
+        m.put("foo", 24L);
+        assertEquals(m, mt.getSerializer().deserializeForNativeProtocol(fields[3], 3));
+    }
+}


[4/5] git commit: Native protocol v3

Posted by sl...@apache.org.
Native protocol v3

patch by slebresne; reviewed by thobbs for CASSANDRA-6855


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9872b74e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9872b74e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9872b74e

Branch: refs/heads/trunk
Commit: 9872b74ef20018e4e7645a8952fd7295e75764ad
Parents: ece3864
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Mar 12 18:58:55 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Apr 30 20:21:35 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   2 +-
 doc/native_protocol_v3.spec                     | 911 +++++++++++++++++++
 src/java/org/apache/cassandra/auth/Auth.java    |  16 +-
 .../cassandra/auth/CassandraAuthorizer.java     |   6 +-
 .../cassandra/auth/PasswordAuthenticator.java   |   4 +-
 .../org/apache/cassandra/cql3/Attributes.java   |   8 +-
 .../cassandra/cql3/BatchQueryOptions.java       |  81 +-
 .../apache/cassandra/cql3/ColumnCondition.java  |  32 +-
 .../org/apache/cassandra/cql3/Constants.java    |  20 +-
 src/java/org/apache/cassandra/cql3/Lists.java   |  34 +-
 src/java/org/apache/cassandra/cql3/Maps.java    |  32 +-
 .../org/apache/cassandra/cql3/QueryHandler.java |   3 +-
 .../org/apache/cassandra/cql3/QueryOptions.java | 283 ++++--
 .../apache/cassandra/cql3/QueryProcessor.java   |  29 +-
 .../org/apache/cassandra/cql3/ResultSet.java    |   6 +-
 src/java/org/apache/cassandra/cql3/Sets.java    |  26 +-
 src/java/org/apache/cassandra/cql3/Term.java    |  18 +-
 .../apache/cassandra/cql3/UpdateParameters.java |   6 +-
 .../org/apache/cassandra/cql3/UserTypes.java    |  18 +-
 .../cassandra/cql3/functions/FunctionCall.java  |  20 +-
 .../cql3/statements/BatchStatement.java         |  95 +-
 .../cql3/statements/CQL3CasConditions.java      |  14 +-
 .../cql3/statements/ModificationStatement.java  |  63 +-
 .../cassandra/cql3/statements/Restriction.java  |  28 +-
 .../cql3/statements/SelectStatement.java        | 150 +--
 .../org/apache/cassandra/db/DefsTables.java     |  19 +-
 .../cassandra/db/marshal/CollectionType.java    |  29 +-
 .../apache/cassandra/db/marshal/ListType.java   |  12 +-
 .../apache/cassandra/db/marshal/MapType.java    |  21 +-
 .../apache/cassandra/db/marshal/SetType.java    |  15 +-
 .../apache/cassandra/db/marshal/UserType.java   |   5 +
 .../hadoop/pig/AbstractCassandraStorage.java    |  11 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  |  11 +-
 .../serializers/CollectionSerializer.java       | 106 ++-
 .../cassandra/serializers/ListSerializer.java   |  39 +-
 .../cassandra/serializers/MapSerializer.java    |  48 +-
 .../cassandra/serializers/SetSerializer.java    |  39 +-
 .../cassandra/service/IMigrationListener.java   |   3 +
 .../cassandra/service/MigrationManager.java     |  18 +
 .../cassandra/thrift/CassandraServer.java       |   4 +-
 .../org/apache/cassandra/transport/CBUtil.java  |  17 +
 .../org/apache/cassandra/transport/Client.java  |   4 +-
 .../apache/cassandra/transport/DataType.java    |  79 +-
 .../org/apache/cassandra/transport/Event.java   | 158 +++-
 .../apache/cassandra/transport/OptionCodec.java |  28 +-
 .../org/apache/cassandra/transport/Server.java  |  21 +-
 .../cassandra/transport/SimpleClient.java       |   4 +-
 .../transport/messages/BatchMessage.java        |  53 +-
 .../transport/messages/EventMessage.java        |   6 +-
 .../transport/messages/ExecuteMessage.java      |   5 +-
 .../cassandra/transport/SerDeserTest.java       | 217 +++++
 52 files changed, 2232 insertions(+), 646 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64e5afb..a4811f6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -54,6 +54,7 @@
  * Preemptive opening of compaction result (CASSANDRA-6916)
  * Multi-threaded scrub/cleanup/upgradesstables (CASSANDRA-5547)
  * Optimize cellname comparison (CASSANDRA-6934)
+ * Native protocol v3 (CASSANDRA-6855)
 Merged from 2.0:
  * Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072)
  * Set JMX RMI port to 7199 (CASSANDRA-7087)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index d2b892e..ba29b37 100644
--- a/build.xml
+++ b/build.xml
@@ -387,7 +387,7 @@
           <dependency groupId="com.addthis.metrics" artifactId="reporter-config" version="2.1.0" />
           <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
           <dependency groupId="io.airlift" artifactId="airline" version="0.6" />
-          <dependency groupId="io.netty" artifactId="netty" version="3.6.6.Final" />
+          <dependency groupId="io.netty" artifactId="netty" version="4.0.17.Final" />
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
           <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.0.1" />

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/doc/native_protocol_v3.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v3.spec b/doc/native_protocol_v3.spec
new file mode 100644
index 0000000..6662f1c
--- /dev/null
+++ b/doc/native_protocol_v3.spec
@@ -0,0 +1,911 @@
+
+                             CQL BINARY PROTOCOL v3
+
+
+Table of Contents
+
+  1. Overview
+  2. Frame header
+    2.1. version
+    2.2. flags
+    2.3. stream
+    2.4. opcode
+    2.5. length
+  3. Notations
+  4. Messages
+    4.1. Requests
+      4.1.1. STARTUP
+      4.1.2. AUTH_RESPONSE
+      4.1.3. OPTIONS
+      4.1.4. QUERY
+      4.1.5. PREPARE
+      4.1.6. EXECUTE
+      4.1.7. BATCH
+      4.1.8. REGISTER
+    4.2. Responses
+      4.2.1. ERROR
+      4.2.2. READY
+      4.2.3. AUTHENTICATE
+      4.2.4. SUPPORTED
+      4.2.5. RESULT
+        4.2.5.1. Void
+        4.2.5.2. Rows
+        4.2.5.3. Set_keyspace
+        4.2.5.4. Prepared
+        4.2.5.5. Schema_change
+      4.2.6. EVENT
+      4.2.7. AUTH_CHALLENGE
+      4.2.8. AUTH_SUCCESS
+  5. Compression
+  6. Collection types
+  7. User Defined types
+  8. Result paging
+  9. Error codes
+  10. Changes from v2
+
+
+1. Overview
+
+  The CQL binary protocol is a frame based protocol. Frames are defined as:
+
+      0         8        16        24        32
+      +---------+---------+---------+---------+
+      | version |  flags  | stream  | opcode  |
+      +---------+---------+---------+---------+
+      |                length                 |
+      +---------+---------+---------+---------+
+      |                                       |
+      .            ...  body ...              .
+      .                                       .
+      .                                       .
+      +----------------------------------------
+
+  The protocol is big-endian (network byte order).
+
+  Each frame contains a fixed size header (8 bytes) followed by a variable size
+  body. The header is described in Section 2. The content of the body depends
+  on the header opcode value (the body can in particular be empty for some
+  opcode values). The list of allowed opcode is defined Section 2.3 and the
+  details of each corresponding message is described Section 4.
+
+  The protocol distinguishes 2 types of frames: requests and responses. Requests
+  are those frame sent by the clients to the server, response are the ones sent
+  by the server. Note however that the protocol supports server pushes (events)
+  so responses does not necessarily come right after a client request.
+
+  Note to client implementors: clients library should always assume that the
+  body of a given frame may contain more data than what is described in this
+  document. It will however always be safe to ignore the remaining of the frame
+  body in such cases. The reason is that this may allow to sometimes extend the
+  protocol with optional features without needing to change the protocol
+  version.
+
+
+
+2. Frame header
+
+2.1. version
+
+  The version is a single byte that indicate both the direction of the message
+  (request or response) and the version of the protocol in use. The up-most bit
+  of version is used to define the direction of the message: 0 indicates a
+  request, 1 indicates a responses. This can be useful for protocol analyzers to
+  distinguish the nature of the packet from the direction which it is moving.
+  The rest of that byte is the protocol version (3 for the protocol defined in
+  this document). In other words, for this version of the protocol, version will
+  have one of:
+    0x03    Request frame for this protocol version
+    0x83    Response frame for this protocol version
+
+  Please note that the while every message ship with the version, only one version
+  of messages is accepted on a given connection. In other words, the first message
+  exchanged (STARTUP) sets the version for the connection for the lifetime of this
+  connection.
+
+  This document describe the version 3 of the protocol. For the changes made since
+  version 2, see Section 10.
+
+
+2.2. flags
+
+  Flags applying to this frame. The flags have the following meaning (described
+  by the mask that allow to select them):
+    0x01: Compression flag. If set, the frame body is compressed. The actual
+          compression to use should have been set up beforehand through the
+          Startup message (which thus cannot be compressed; Section 4.1.1).
+    0x02: Tracing flag. For a request frame, this indicate the client requires
+          tracing of the request. Note that not all requests support tracing.
+          Currently, only QUERY, PREPARE and EXECUTE queries support tracing.
+          Other requests will simply ignore the tracing flag if set. If a
+          request support tracing and the tracing flag was set, the response to
+          this request will have the tracing flag set and contain tracing
+          information.
+          If a response frame has the tracing flag set, its body contains
+          a tracing ID. The tracing ID is a [uuid] and is the first thing in
+          the frame body. The rest of the body will then be the usual body
+          corresponding to the response opcode.
+
+  The rest of the flags is currently unused and ignored.
+
+2.3. stream
+
+  A frame has a stream id (one signed byte). When sending request messages, this
+  stream id must be set by the client to a positive byte (negative stream id
+  are reserved for streams initiated by the server; currently all EVENT messages
+  (section 4.2.6) have a streamId of -1). If a client sends a request message
+  with the stream id X, it is guaranteed that the stream id of the response to
+  that message will be X.
+
+  This allow to deal with the asynchronous nature of the protocol. If a client
+  sends multiple messages simultaneously (without waiting for responses), there
+  is no guarantee on the order of the responses. For instance, if the client
+  writes REQ_1, REQ_2, REQ_3 on the wire (in that order), the server might
+  respond to REQ_3 (or REQ_2) first. Assigning different stream id to these 3
+  requests allows the client to distinguish to which request an received answer
+  respond to. As there can only be 128 different simultaneous stream, it is up
+  to the client to reuse stream id.
+
+  Note that clients are free to use the protocol synchronously (i.e. wait for
+  the response to REQ_N before sending REQ_N+1). In that case, the stream id
+  can be safely set to 0. Clients should also feel free to use only a subset of
+  the 128 maximum possible stream ids if it is simpler for those
+  implementation.
+
+2.4. opcode
+
+  An integer byte that distinguish the actual message:
+    0x00    ERROR
+    0x01    STARTUP
+    0x02    READY
+    0x03    AUTHENTICATE
+    0x05    OPTIONS
+    0x06    SUPPORTED
+    0x07    QUERY
+    0x08    RESULT
+    0x09    PREPARE
+    0x0A    EXECUTE
+    0x0B    REGISTER
+    0x0C    EVENT
+    0x0D    BATCH
+    0x0E    AUTH_CHALLENGE
+    0x0F    AUTH_RESPONSE
+    0x10    AUTH_SUCCESS
+
+  Messages are described in Section 4.
+
+  (Note that there is no 0x04 message in this version of the protocol)
+
+
+2.5. length
+
+  A 4 byte integer representing the length of the body of the frame (note:
+  currently a frame is limited to 256MB in length).
+
+
+3. Notations
+
+  To describe the layout of the frame body for the messages in Section 4, we
+  define the following:
+
+    [int]          A 4 bytes integer
+    [long]         A 8 bytes integer
+    [short]        A 2 bytes unsigned integer
+    [string]       A [short] n, followed by n bytes representing an UTF-8
+                   string.
+    [long string]  An [int] n, followed by n bytes representing an UTF-8 string.
+    [uuid]         A 16 bytes long uuid.
+    [string list]  A [short] n, followed by n [string].
+    [bytes]        A [int] n, followed by n bytes if n >= 0. If n < 0,
+                   no byte should follow and the value represented is `null`.
+    [short bytes]  A [short] n, followed by n bytes if n >= 0.
+
+    [option]       A pair of <id><value> where <id> is a [short] representing
+                   the option id and <value> depends on that option (and can be
+                   of size 0). The supported id (and the corresponding <value>)
+                   will be described when this is used.
+    [option list]  A [short] n, followed by n [option].
+    [inet]         An address (ip and port) to a node. It consists of one
+                   [byte] n, that represents the address size, followed by n
+                   [byte] representing the IP address (in practice n can only be
+                   either 4 (IPv4) or 16 (IPv6)), following by one [int]
+                   representing the port.
+    [consistency]  A consistency level specification. This is a [short]
+                   representing a consistency level with the following
+                   correspondance:
+                     0x0000    ANY
+                     0x0001    ONE
+                     0x0002    TWO
+                     0x0003    THREE
+                     0x0004    QUORUM
+                     0x0005    ALL
+                     0x0006    LOCAL_QUORUM
+                     0x0007    EACH_QUORUM
+                     0x0008    SERIAL
+                     0x0009    LOCAL_SERIAL
+                     0x000A    LOCAL_ONE
+
+    [string map]      A [short] n, followed by n pair <k><v> where <k> and <v>
+                      are [string].
+    [string multimap] A [short] n, followed by n pair <k><v> where <k> is a
+                      [string] and <v> is a [string list].
+
+
+4. Messages
+
+4.1. Requests
+
+  Note that outside of their normal responses (described below), all requests
+  can get an ERROR message (Section 4.2.1) as response.
+
+4.1.1. STARTUP
+
+  Initialize the connection. The server will respond by either a READY message
+  (in which case the connection is ready for queries) or an AUTHENTICATE message
+  (in which case credentials will need to be provided using AUTH_RESPONSE).
+
+  This must be the first message of the connection, except for OPTIONS that can
+  be sent before to find out the options supported by the server. Once the
+  connection has been initialized, a client should not send any more STARTUP
+  message.
+
+  The body is a [string map] of options. Possible options are:
+    - "CQL_VERSION": the version of CQL to use. This option is mandatory and
+      currenty, the only version supported is "3.0.0". Note that this is
+      different from the protocol version.
+    - "COMPRESSION": the compression algorithm to use for frames (See section 5).
+      This is optional, if not specified no compression will be used.
+
+
+4.1.2. AUTH_RESPONSE
+
+  Answers a server authentication challenge.
+
+  Authentication in the protocol is SASL based. The server sends authentication
+  challenges (a bytes token) to which the client answer with this message. Those
+  exchanges continue until the server accepts the authentication by sending a
+  AUTH_SUCCESS message after a client AUTH_RESPONSE. It is however that client that
+  initiate the exchange by sending an initial AUTH_RESPONSE in response to a
+  server AUTHENTICATE request.
+
+  The body of this message is a single [bytes] token. The details of what this
+  token contains (and when it can be null/empty, if ever) depends on the actual
+  authenticator used.
+
+  The response to a AUTH_RESPONSE is either a follow-up AUTH_CHALLENGE message,
+  an AUTH_SUCCESS message or an ERROR message.
+
+
+4.1.3. OPTIONS
+
+  Asks the server to return what STARTUP options are supported. The body of an
+  OPTIONS message should be empty and the server will respond with a SUPPORTED
+  message.
+
+
+4.1.4. QUERY
+
+  Performs a CQL query. The body of the message must be:
+    <query><query_parameters>
+  where <query> is a [long string] representing the query and
+  <query_parameters> must be
+    <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>]
+  where:
+    - <consistency> is the [consistency] level for the operation.
+    - <flags> is a [byte] whose bits define the options for this query and
+      in particular influence what the remainder of the message contains.
+      A flag is set if the bit corresponding to its `mask` is set. Supported
+      flags are, given there mask:
+        0x01: Values. In that case, a [short] <n> followed by <n> [bytes]
+              values are provided. Those value are used for bound variables in
+              the query. Optionally, if the 0x40 flag is present, each value
+              will be preceded by a [string] name, representing the name of
+              the marker the value must be binded to. This is optional, and
+              if not present, values will be binded by position.
+        0x02: Skip_metadata. If present, the Result Set returned as a response
+              to that query (if any) will have the NO_METADATA flag (see
+              Section 4.2.5.2).
+        0x04: Page_size. In that case, <result_page_size> is an [int]
+              controlling the desired page size of the result (in CQL3 rows).
+              See the section on paging (Section 7) for more details.
+        0x08: With_paging_state. If present, <paging_state> should be present.
+              <paging_state> is a [bytes] value that should have been returned
+              in a result set (Section 4.2.5.2). If provided, the query will be
+              executed but starting from a given paging state. This also to
+              continue paging on a different node from the one it has been
+              started (See Section 7 for more details).
+        0x10: With serial consistency. If present, <serial_consistency> should be
+              present. <serial_consistency> is the [consistency] level for the
+              serial phase of conditional updates. That consitency can only be
+              either SERIAL or LOCAL_SERIAL and if not present, it defaults to
+              SERIAL. This option will be ignored for anything else that a
+              conditional update/insert.
+        0x20: With default timestamp. If present, <timestamp> should be present.
+              <timestamp> is a [long] representing the default timestamp for the query
+              in microseconds (negative values are forbidden). If provided, this will
+              replace the server side assigned timestamp as default timestamp.
+              Note that a timestamp in the query itself will still override
+              this timestamp. This is entirely optional.
+        0x40: With names for values. This only makes sense if the 0x01 flag is set and
+              is ignored otherwise. If present, the values from the 0x01 flag will
+              be preceded by a name (see above). Note that this is only useful for
+              QUERY requests where named bind markers are used; for EXECUTE statements,
+              since the names for the expected values was returned during preparation,
+              a client can always provide values in the right order without any names
+              and using this flag, while supported, is almost surely inefficient.
+
+  Note that the consistency is ignored by some queries (USE, CREATE, ALTER,
+  TRUNCATE, ...).
+
+  The server will respond to a QUERY message with a RESULT message, the content
+  of which depends on the query.
+
+
+4.1.5. PREPARE
+
+  Prepare a query for later execution (through EXECUTE). The body consists of
+  the CQL query to prepare as a [long string].
+
+  The server will respond with a RESULT message with a `prepared` kind (0x0004,
+  see Section 4.2.5).
+
+
+4.1.6. EXECUTE
+
+  Executes a prepared query. The body of the message must be:
+    <id><query_parameters>
+  where <id> is the prepared query ID. It's the [short bytes] returned as a
+  response to a PREPARE message. As for <query_parameters>, it has the exact
+  same definition than in QUERY (see Section 4.1.4).
+
+  The response from the server will be a RESULT message.
+
+
+4.1.7. BATCH
+
+  Allows executing a list of queries (prepared or not) as a batch (note that
+  only DML statements are accepted in a batch). The body of the message must
+  be:
+    <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>]
+  where:
+    - <type> is a [byte] indicating the type of batch to use:
+        - If <type> == 0, the batch will be "logged". This is equivalent to a
+          normal CQL3 batch statement.
+        - If <type> == 1, the batch will be "unlogged".
+        - If <type> == 2, the batch will be a "counter" batch (and non-counter
+          statements will be rejected).
+    - <flags> is a [byte] whose bits define the options for this query and
+      in particular influence the remainder of the message contains. It is similar
+      to the <flags> from QUERY and EXECUTE methods, except that the 4 rightmost
+      bits must always be 0 as their corresponding option do not make sense for
+      Batch. A flag is set if the bit corresponding to its `mask` is set. Supported
+      flags are, given there mask:
+        0x10: With serial consistency. If present, <serial_consistency> should be
+              present. <serial_consistency> is the [consistency] level for the
+              serial phase of conditional updates. That consitency can only be
+              either SERIAL or LOCAL_SERIAL and if not present, it defaults to
+              SERIAL. This option will be ignored for anything else that a
+              conditional update/insert.
+        0x20: With default timestamp. If present, <timestamp> should be present.
+              <timestamp> is a [long] representing the default timestamp for the query
+              in microseconds. If provided, this will replace the server side assigned
+              timestamp as default timestamp. Note that a timestamp in the query itself
+              will still override this timestamp. This is entirely optional.
+        0x40: With names for values. If set, then all values for all <query_i> must be
+              preceded by a [string] <name_i> that have the same meaning as in QUERY
+              requests.
+    - <n> is a [short] indicating the number of following queries.
+    - <query_1>...<query_n> are the queries to execute. A <query_i> must be of the
+      form:
+        <kind><string_or_id><n>[<name_1>]<value_1>...[<name_n>]<value_n>
+      where:
+       - <kind> is a [byte] indicating whether the following query is a prepared
+         one or not. <kind> value must be either 0 or 1.
+       - <string_or_id> depends on the value of <kind>. If <kind> == 0, it should be
+         a [long string] query string (as in QUERY, the query string might contain
+         bind markers). Otherwise (that is, if <kind> == 1), it should be a
+         [short bytes] representing a prepared query ID.
+       - <n> is a [short] indicating the number (possibly 0) of following values.
+       - <name_i> is the optional name of the following <value_i>. It must be present
+         if and only if the 0x40 flag is provided for the batch.
+       - <value_i> is the [bytes] to use for bound variable i (of bound variable <name_i>
+         if the 0x40 flag is used).
+    - <consistency> is the [consistency] level for the operation.
+    - <serial_consistency> is only present if the 0x10 flag is set. In that case,
+      <serial_consistency> is the [consistency] level for the serial phase of
+      conditional updates. That consitency can only be either SERIAL or
+      LOCAL_SERIAL and if not present will defaults to SERIAL. This option will
+      be ignored for anything else that a conditional update/insert.
+
+  The server will respond with a RESULT message.
+
+
+4.1.8. REGISTER
+
+  Register this connection to receive some type of events. The body of the
+  message is a [string list] representing the event types to register to. See
+  section 4.2.6 for the list of valid event types.
+
+  The response to a REGISTER message will be a READY message.
+
+  Please note that if a client driver maintains multiple connections to a
+  Cassandra node and/or connections to multiple nodes, it is advised to
+  dedicate a handful of connections to receive events, but to *not* register
+  for events on all connections, as this would only result in receiving
+  multiple times the same event messages, wasting bandwidth.
+
+
+4.2. Responses
+
+  This section describes the content of the frame body for the different
+  responses. Please note that to make room for future evolution, clients should
+  support extra informations (that they should simply discard) to the one
+  described in this document at the end of the frame body.
+
+4.2.1. ERROR
+
+  Indicates an error processing a request. The body of the message will be an
+  error code ([int]) followed by a [string] error message. Then, depending on
+  the exception, more content may follow. The error codes are defined in
+  Section 8, along with their additional content if any.
+
+
+4.2.2. READY
+
+  Indicates that the server is ready to process queries. This message will be
+  sent by the server either after a STARTUP message if no authentication is
+  required, or after a successful CREDENTIALS message.
+
+  The body of a READY message is empty.
+
+
+4.2.3. AUTHENTICATE
+
+  Indicates that the server require authentication, and which authentication
+  mechanism to use.
+
+  The authentication is SASL based and thus consists on a number of server
+  challenges (AUTH_CHALLENGE, Section 4.2.7) followed by client responses
+  (AUTH_RESPONSE, Section 4.1.2). The Initial exchange is however boostrapped
+  by an initial client response. The details of that exchange (including how
+  much challenge-response pair are required) are specific to the authenticator
+  in use. The exchange ends when the server sends an AUTH_SUCCESS message or
+  an ERROR message.
+
+  This message will be sent following a STARTUP message if authentication is
+  required and must be answered by a AUTH_RESPONSE message from the client.
+
+  The body consists of a single [string] indicating the full class name of the
+  IAuthenticator in use.
+
+
+4.2.4. SUPPORTED
+
+  Indicates which startup options are supported by the server. This message
+  comes as a response to an OPTIONS message.
+
+  The body of a SUPPORTED message is a [string multimap]. This multimap gives
+  for each of the supported STARTUP options, the list of supported values.
+
+
+4.2.5. RESULT
+
+  The result to a query (QUERY, PREPARE, EXECUTE or BATCH messages).
+
+  The first element of the body of a RESULT message is an [int] representing the
+  `kind` of result. The rest of the body depends on the kind. The kind can be
+  one of:
+    0x0001    Void: for results carrying no information.
+    0x0002    Rows: for results to select queries, returning a set of rows.
+    0x0003    Set_keyspace: the result to a `use` query.
+    0x0004    Prepared: result to a PREPARE message.
+    0x0005    Schema_change: the result to a schema altering query.
+
+  The body for each kind (after the [int] kind) is defined below.
+
+
+4.2.5.1. Void
+
+  The rest of the body for a Void result is empty. It indicates that a query was
+  successful without providing more information.
+
+
+4.2.5.2. Rows
+
+  Indicates a set of rows. The rest of body of a Rows result is:
+    <metadata><rows_count><rows_content>
+  where:
+    - <metadata> is composed of:
+        <flags><columns_count>[<paging_state>][<global_table_spec>?<col_spec_1>...<col_spec_n>]
+      where:
+        - <flags> is an [int]. The bits of <flags> provides information on the
+          formatting of the remaining informations. A flag is set if the bit
+          corresponding to its `mask` is set. Supported flags are, given there
+          mask:
+            0x0001    Global_tables_spec: if set, only one table spec (keyspace
+                      and table name) is provided as <global_table_spec>. If not
+                      set, <global_table_spec> is not present.
+            0x0002    Has_more_pages: indicates whether this is not the last
+                      page of results and more should be retrieve. If set, the
+                      <paging_state> will be present. The <paging_state> is a
+                      [bytes] value that should be used in QUERY/EXECUTE to
+                      continue paging and retrieve the remained of the result for
+                      this query (See Section 7 for more details).
+            0x0004    No_metadata: if set, the <metadata> is only composed of
+                      these <flags>, the <column_count> and optionally the
+                      <paging_state> (depending on the Has_more_pages flage) but
+                      no other information (so no <global_table_spec> nor <col_spec_i>).
+                      This will only ever be the case if this was requested
+                      during the query (see QUERY and RESULT messages).
+        - <columns_count> is an [int] representing the number of columns selected
+          by the query this result is of. It defines the number of <col_spec_i>
+          elements in and the number of element for each row in <rows_content>.
+        - <global_table_spec> is present if the Global_tables_spec is set in
+          <flags>. If present, it is composed of two [string] representing the
+          (unique) keyspace name and table name the columns return are of.
+        - <col_spec_i> specifies the columns returned in the query. There is
+          <column_count> such column specifications that are composed of:
+            (<ksname><tablename>)?<name><type>
+          The initial <ksname> and <tablename> are two [string] are only present
+          if the Global_tables_spec flag is not set. The <column_name> is a
+          [string] and <type> is an [option] that correspond to the description
+          (what this description is depends a bit on the context: in results to
+          selects, this will be either the user chosen alias or the selection used
+          (often a colum name, but it can be a function call too). In results to
+          a PREPARE, this will be either the name of the bind variable corresponding
+          or the column name for the variable if it is "anonymous") and type of
+          the corresponding result. The option for <type> is either a native
+          type (see below), in which case the option has no value, or a
+          'custom' type, in which case the value is a [string] representing
+          the full qualified class name of the type represented. Valid option
+          ids are:
+            0x0000    Custom: the value is a [string], see above.
+            0x0001    Ascii
+            0x0002    Bigint
+            0x0003    Blob
+            0x0004    Boolean
+            0x0005    Counter
+            0x0006    Decimal
+            0x0007    Double
+            0x0008    Float
+            0x0009    Int
+            0x000B    Timestamp
+            0x000C    Uuid
+            0x000D    Varchar
+            0x000E    Varint
+            0x000F    Timeuuid
+            0x0010    Inet
+            0x0020    List: the value is an [option], representing the type
+                            of the elements of the list.
+            0x0021    Map: the value is two [option], representing the types of the
+                           keys and values of the map
+            0x0022    Set: the value is an [option], representing the type
+                            of the elements of the set
+            0x0030    UDT: the value is <ks><udt_name><n><name_1><type_1>...<name_n><type_n>
+                           where:
+                              - <ks> is a [string] representing the keyspace name this
+                                UDT is part of.
+                              - <udt_name> is a [string] representing the UDT name.
+                              - <n> is a [short] reprensenting the number of fields of
+                                the UDT, and thus the number of <name_i><type_i> pair
+                                following
+                              - <name_i> is a [string] representing the name of the
+                                i_th field of the UDT.
+                              - <type_i> is an [option] representing the type of the
+                                i_th field of the UDT.
+
+    - <rows_count> is an [int] representing the number of rows present in this
+      result. Those rows are serialized in the <rows_content> part.
+    - <rows_content> is composed of <row_1>...<row_m> where m is <rows_count>.
+      Each <row_i> is composed of <value_1>...<value_n> where n is
+      <columns_count> and where <value_j> is a [bytes] representing the value
+      returned for the jth column of the ith row. In other words, <rows_content>
+      is composed of (<rows_count> * <columns_count>) [bytes].
+
+
+4.2.5.3. Set_keyspace
+
+  The result to a `use` query. The body (after the kind [int]) is a single
+  [string] indicating the name of the keyspace that has been set.
+
+
+4.2.5.4. Prepared
+
+  The result to a PREPARE message. The rest of the body of a Prepared result is:
+    <id><metadata><result_metadata>
+  where:
+    - <id> is [short bytes] representing the prepared query ID.
+    - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2; you
+      can however assume that the Has_more_pages flag is always off) and
+      is the specification for the variable bound in this prepare statement.
+    - <result_metadata> is defined exactly as <metadata> but correspond to the
+      metadata for the resultSet that execute this query will yield. Note that
+      <result_metadata> may be empty (have the No_metadata flag and 0 columns, See
+      section 4.2.5.2) and will be for any query that is not a Select. There is
+      in fact never a guarantee that this will non-empty so client should protect
+      themselves accordingly. The presence of this information is an
+      optimization that allows to later execute the statement that has been
+      prepared without requesting the metadata (Skip_metadata flag in EXECUTE).
+      Clients can safely discard this metadata if they do not want to take
+      advantage of that optimization.
+
+  Note that prepared query ID return is global to the node on which the query
+  has been prepared. It can be used on any connection to that node and this
+  until the node is restarted (after which the query must be reprepared).
+
+4.2.5.5. Schema_change
+
+  The result to a schema altering query (creation/update/drop of a
+  keyspace/table/index). The body (after the kind [int]) is composed of 3
+  [string]:
+    <change><keyspace><table>
+  where:
+    - <change> describe the type of change that has occured. It can be one of
+      "CREATED", "UPDATED" or "DROPPED".
+    - <keyspace> is the name of the affected keyspace or the keyspace of the
+      affected table.
+    - <table> is the name of the affected table. <table> will be empty (i.e.
+      the empty string "") if the change was affecting a keyspace and not a
+      table.
+
+  Note that queries to create and drop an index are considered as change
+  updating the table the index is on.
+
+
+4.2.6. EVENT
+
+  And event pushed by the server. A client will only receive events for the
+  type it has REGISTER to. The body of an EVENT message will start by a
+  [string] representing the event type. The rest of the message depends on the
+  event type. The valid event types are:
+    - "TOPOLOGY_CHANGE": events related to change in the cluster topology.
+      Currently, events are sent when new nodes are added to the cluster, and
+      when nodes are removed. The body of the message (after the event type)
+      consists of a [string] and an [inet], corresponding respectively to the
+      type of change ("NEW_NODE" or "REMOVED_NODE") followed by the address of
+      the new/removed node.
+    - "STATUS_CHANGE": events related to change of node status. Currently,
+      up/down events are sent. The body of the message (after the event type)
+      consists of a [string] and an [inet], corresponding respectively to the
+      type of status change ("UP" or "DOWN") followed by the address of the
+      concerned node.
+    - "SCHEMA_CHANGE": events related to schema change. After the event type,
+      the rest of the message will be <change_type><target><options> where:
+        - <change_type> is the type of changed involved. It will be one of
+          "CREATED", "UPDATED" or "DROPPED".
+        - <target> can be one of "KEYSPACE", "TABLE" or "TYPE" and describes
+          what has been modified ("TYPE" stands for modifications related to
+          user types).
+        - <options> depends on the preceding <target>. If <target> is
+          "KEYSPACE", then <options> will be a single [string] representing the
+          keyspace changed. Otherwise, if <target> is "TABLE" or "TYPE", then
+          <options> will be 2 [string]: the first one will be the keyspace
+          containing the affected object, and the second one will be the name
+          of said affected object (so either the table name or the user type
+          name).
+
+  All EVENT message have a streamId of -1 (Section 2.3).
+
+  Please note that "NEW_NODE" and "UP" events are sent based on internal Gossip
+  communication and as such may be sent a short delay before the binary
+  protocol server on the newly up node is fully started. Clients are thus
+  advise to wait a short time before trying to connect to the node (1 seconds
+  should be enough), otherwise they may experience a connection refusal at
+  first.
+
+4.2.7. AUTH_CHALLENGE
+
+  A server authentication challenge (see AUTH_RESPONSE (Section 4.1.2) for more
+  details).
+
+  The body of this message is a single [bytes] token. The details of what this
+  token contains (and when it can be null/empty, if ever) depends on the actual
+  authenticator used.
+
+  Clients are expected to answer the server challenge by an AUTH_RESPONSE
+  message.
+
+4.2.7. AUTH_SUCCESS
+
+  Indicate the success of the authentication phase. See Section 4.2.3 for more
+  details.
+
+  The body of this message is a single [bytes] token holding final information
+  from the server that the client may require to finish the authentication
+  process. What that token contains and whether it can be null depends on the
+  actual authenticator used.
+
+
+5. Compression
+
+  Frame compression is supported by the protocol, but then only the frame body
+  is compressed (the frame header should never be compressed).
+
+  Before being used, client and server must agree on a compression algorithm to
+  use, which is done in the STARTUP message. As a consequence, a STARTUP message
+  must never be compressed.  However, once the STARTUP frame has been received
+  by the server can be compressed (including the response to the STARTUP
+  request). Frame do not have to be compressed however, even if compression has
+  been agreed upon (a server may only compress frame above a certain size at its
+  discretion). A frame body should be compressed if and only if the compressed
+  flag (see Section 2.2) is set.
+
+  As of this version 2 of the protocol, the following compressions are available:
+    - lz4 (https://code.google.com/p/lz4/). In that, note that the 4 first bytes
+      of the body will be the uncompressed length (followed by the compressed
+      bytes).
+    - snappy (https://code.google.com/p/snappy/). This compression might not be
+      available as it depends on a native lib (server-side) that might not be
+      avaivable on some installation.
+
+
+6. Collection types
+
+  This section describe the serialization format for the collection types:
+  list, map and set. This serialization format is both useful to decode values
+  returned in RESULT messages but also to encode values for EXECUTE ones.
+
+  The serialization formats are:
+     List: a [int] n indicating the size of the list, followed by n elements.
+           Each element is [bytes] representing the serialized element
+           value.
+     Map: a [int] n indicating the size of the map, followed by n entries.
+          Each entry is composed of two [bytes] representing the key and
+          the value of the entry map.
+     Set: a [int] n indicating the size of the set, followed by n elements.
+          Each element is [bytes] representing the serialized element
+          value.
+
+
+7. User defined types
+
+  This section describe the serialization format for User defined types (UDT) values.
+  UDT values are the values of the User Defined Types as defined in section 4.2.5.2.
+
+  A UDT value is a [short] n indicating the number of values (field) of UDT values
+  followed by n elements. Each element is a [short bytes] representing the serialized
+  field.
+
+
+8. Result paging
+
+  The protocol allows for paging the result of queries. For that, the QUERY and
+  EXECUTE messages have a <result_page_size> value that indicate the desired
+  page size in CQL3 rows.
+
+  If a positive value is provided for <result_page_size>, the result set of the
+  RESULT message returned for the query will contain at most the
+  <result_page_size> first rows of the query result. If that first page of result
+  contains the full result set for the query, the RESULT message (of kind `Rows`)
+  will have the Has_more_pages flag *not* set. However, if some results are not
+  part of the first response, the Has_more_pages flag will be set and the result
+  will contain a <paging_state> value. In that case, the <paging_state> value
+  should be used in a QUERY or EXECUTE message (that has the *same* query than
+  the original one or the behavior is undefined) to retrieve the next page of
+  results.
+
+  Only CQL3 queries that return a result set (RESULT message with a Rows `kind`)
+  support paging. For other type of queries, the <result_page_size> value is
+  ignored.
+
+  Note to client implementors:
+  - While <result_page_size> can be as low as 1, it will likely be detrimental
+    to performance to pick a value too low. A value below 100 is probably too
+    low for most use cases.
+  - Clients should not rely on the actual size of the result set returned to
+    decide if there is more result to fetch or not. Instead, they should always
+    check the Has_more_pages flag (unless they did not enabled paging for the query
+    obviously). Clients should also not assert that no result will have more than
+    <result_page_size> results. While the current implementation always respect
+    the exact value of <result_page_size>, we reserve ourselves the right to return
+    slightly smaller or bigger pages in the future for performance reasons.
+
+
+9. Error codes
+
+  The supported error codes are described below:
+    0x0000    Server error: something unexpected happened. This indicates a
+              server-side bug.
+    0x000A    Protocol error: some client message triggered a protocol
+              violation (for instance a QUERY message is sent before a STARTUP
+              one has been sent)
+    0x0100    Bad credentials: CREDENTIALS request failed because Cassandra
+              did not accept the provided credentials.
+
+    0x1000    Unavailable exception. The rest of the ERROR message body will be
+                <cl><required><alive>
+              where:
+                <cl> is the [consistency] level of the query having triggered
+                     the exception.
+                <required> is an [int] representing the number of node that
+                           should be alive to respect <cl>
+                <alive> is an [int] representing the number of replica that
+                        were known to be alive when the request has been
+                        processed (since an unavailable exception has been
+                        triggered, there will be <alive> < <required>)
+    0x1001    Overloaded: the request cannot be processed because the
+              coordinator node is overloaded
+    0x1002    Is_bootstrapping: the request was a read request but the
+              coordinator node is bootstrapping
+    0x1003    Truncate_error: error during a truncation error.
+    0x1100    Write_timeout: Timeout exception during a write request. The rest
+              of the ERROR message body will be
+                <cl><received><blockfor><writeType>
+              where:
+                <cl> is the [consistency] level of the query having triggered
+                     the exception.
+                <received> is an [int] representing the number of nodes having
+                           acknowledged the request.
+                <blockfor> is the number of replica whose acknowledgement is
+                           required to achieve <cl>.
+                <writeType> is a [string] that describe the type of the write
+                            that timeouted. The value of that string can be one
+                            of:
+                             - "SIMPLE": the write was a non-batched
+                               non-counter write.
+                             - "BATCH": the write was a (logged) batch write.
+                               If this type is received, it means the batch log
+                               has been successfully written (otherwise a
+                               "BATCH_LOG" type would have been send instead).
+                             - "UNLOGGED_BATCH": the write was an unlogged
+                               batch. Not batch log write has been attempted.
+                             - "COUNTER": the write was a counter write
+                               (batched or not).
+                             - "BATCH_LOG": the timeout occured during the
+                               write to the batch log when a (logged) batch
+                               write was requested.
+    0x1200    Read_timeout: Timeout exception during a read request. The rest
+              of the ERROR message body will be
+                <cl><received><blockfor><data_present>
+              where:
+                <cl> is the [consistency] level of the query having triggered
+                     the exception.
+                <received> is an [int] representing the number of nodes having
+                           answered the request.
+                <blockfor> is the number of replica whose response is
+                           required to achieve <cl>. Please note that it is
+                           possible to have <received> >= <blockfor> if
+                           <data_present> is false. And also in the (unlikely)
+                           case were <cl> is achieved but the coordinator node
+                           timeout while waiting for read-repair
+                           acknowledgement.
+                <data_present> is a single byte. If its value is 0, it means
+                               the replica that was asked for data has not
+                               responded. Otherwise, the value is != 0.
+
+    0x2000    Syntax_error: The submitted query has a syntax error.
+    0x2100    Unauthorized: The logged user doesn't have the right to perform
+              the query.
+    0x2200    Invalid: The query is syntactically correct but invalid.
+    0x2300    Config_error: The query is invalid because of some configuration issue
+    0x2400    Already_exists: The query attempted to create a keyspace or a
+              table that was already existing. The rest of the ERROR message
+              body will be <ks><table> where:
+                <ks> is a [string] representing either the keyspace that
+                     already exists, or the keyspace in which the table that
+                     already exists is.
+                <table> is a [string] representing the name of the table that
+                        already exists. If the query was attempting to create a
+                        keyspace, <table> will be present but will be the empty
+                        string.
+    0x2500    Unprepared: Can be thrown while a prepared statement tries to be
+              executed if the provide prepared statement ID is not known by
+              this host. The rest of the ERROR message body will be [short
+              bytes] representing the unknown ID.
+
+10. Changes from v2
+  * BATCH messages now have <flags> (like QUERY and EXECUTE) and a corresponding optional
+    <serial_consistency> parameters (see Section 4.1.7).
+  * User Defined Types have to added to ResultSet metadata (see 4.2.5.2) and a new section
+    on the serialization format of UDT values has been added to the documentation
+    (Section 7).
+  * The serialization format for collection has changed (both the collection size and
+    the length of each argument is now 4 bytes long). See Section 6.
+  * QUERY, EXECUTE and BATCH messages can now optionally provide the default timestamp for the query.
+    As this feature is optionally enabled by clients, implementing it is at the discretion of the
+    client.
+  * QUERY, EXECUTE and BATCH messages can now optionally provide the names for the values of the
+    query. As this feature is optionally enabled by clients, implementing it is at the discretion of the
+    client.
+  * The format of "SCHEMA_CHANGE" notifications has been modified, and now includes changes related to
+    user types.
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index 237fc99..528a54a 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -258,8 +258,8 @@ public class Auth
         try
         {
             ResultMessage.Rows rows = selectUserStatement.execute(QueryState.forInternalCalls(),
-                                                                  new QueryOptions(consistencyForUser(username),
-                                                                                   Lists.newArrayList(ByteBufferUtil.bytes(username))));
+                                                                  QueryOptions.forInternalCalls(consistencyForUser(username),
+                                                                                                Lists.newArrayList(ByteBufferUtil.bytes(username))));
             return UntypedResultSet.create(rows.result);
         }
         catch (RequestValidationException e)
@@ -287,6 +287,10 @@ public class Auth
             DatabaseDescriptor.getAuthorizer().revokeAll(DataResource.columnFamily(ksName, cfName));
         }
 
+        public void onDropUserType(String ksName, String userType)
+        {
+        }
+
         public void onCreateKeyspace(String ksName)
         {
         }
@@ -295,6 +299,10 @@ public class Auth
         {
         }
 
+        public void onCreateUserType(String ksName, String userType)
+        {
+        }
+
         public void onUpdateKeyspace(String ksName)
         {
         }
@@ -302,5 +310,9 @@ public class Auth
         public void onUpdateColumnFamily(String ksName, String cfName)
         {
         }
+
+        public void onUpdateUserType(String ksName, String userType)
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index 85d2b16..b37dee2 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -71,9 +71,9 @@ public class CassandraAuthorizer implements IAuthorizer
         try
         {
             ResultMessage.Rows rows = authorizeStatement.execute(QueryState.forInternalCalls(),
-                                                                 new QueryOptions(ConsistencyLevel.ONE,
-                                                                                  Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
-                                                                                                     ByteBufferUtil.bytes(resource.getName()))));
+                                                                 QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
+                                                                                               Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
+                                                                                                                  ByteBufferUtil.bytes(resource.getName()))));
             result = UntypedResultSet.create(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 1567bde..9256c2b 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -106,8 +106,8 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator
         try
         {
             ResultMessage.Rows rows = authenticateStatement.execute(QueryState.forInternalCalls(),
-                                                                    new QueryOptions(consistencyForUser(username),
-                                                                                     Lists.newArrayList(ByteBufferUtil.bytes(username))));
+                                                                    QueryOptions.forInternalCalls(consistencyForUser(username),
+                                                                                                  Lists.newArrayList(ByteBufferUtil.bytes(username))));
             result = UntypedResultSet.create(rows.result);
         }
         catch (RequestValidationException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java
index 7c33e5b..435757b 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -56,12 +56,12 @@ public class Attributes
         return timeToLive != null;
     }
 
-    public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
+    public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
     {
         if (timestamp == null)
             return now;
 
-        ByteBuffer tval = timestamp.bindAndGet(variables);
+        ByteBuffer tval = timestamp.bindAndGet(options);
         if (tval == null)
             throw new InvalidRequestException("Invalid null value of timestamp");
 
@@ -77,12 +77,12 @@ public class Attributes
         return LongType.instance.compose(tval);
     }
 
-    public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
+    public int getTimeToLive(QueryOptions options) throws InvalidRequestException
     {
         if (timeToLive == null)
             return 0;
 
-        ByteBuffer tval = timeToLive.bindAndGet(variables);
+        ByteBuffer tval = timeToLive.bindAndGet(options);
         if (tval == null)
             throw new InvalidRequestException("Invalid null value of TTL");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
index cbf5e92..2bb8071 100644
--- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
@@ -18,38 +18,95 @@
 package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.PagingState;
 
-/**
- * Options for a batch (at the protocol level) queries.
- */
-public class BatchQueryOptions
+public abstract class BatchQueryOptions
 {
-    private final ConsistencyLevel consistency;
-    private final List<List<ByteBuffer>> values;
+    public static BatchQueryOptions DEFAULT = withoutPerStatementVariables(QueryOptions.DEFAULT);
+
+    protected final QueryOptions wrapped;
     private final List<Object> queryOrIdList;
 
-    public BatchQueryOptions(ConsistencyLevel cl, List<List<ByteBuffer>> values, List<Object> queryOrIdList)
+    protected BatchQueryOptions(QueryOptions wrapped, List<Object> queryOrIdList)
     {
-        this.consistency = cl;
-        this.values = values;
+        this.wrapped = wrapped;
         this.queryOrIdList = queryOrIdList;
     }
 
+    public static BatchQueryOptions withoutPerStatementVariables(QueryOptions options)
+    {
+        return new WithoutPerStatementVariables(options, Collections.<Object>emptyList());
+    }
+
+    public static BatchQueryOptions withPerStatementVariables(QueryOptions options, List<List<ByteBuffer>> variables, List<Object> queryOrIdList)
+    {
+        return new WithPerStatementVariables(options, variables, queryOrIdList);
+    }
+
+    public abstract QueryOptions forStatement(int i);
+
     public ConsistencyLevel getConsistency()
     {
-        return consistency;
+        return wrapped.getConsistency();
     }
 
-    public List<List<ByteBuffer>> getValues()
+    public ConsistencyLevel getSerialConsistency()
     {
-        return values;
+        return wrapped.getSerialConsistency();
     }
 
     public List<Object> getQueryOrIdList()
     {
         return queryOrIdList;
     }
+
+    public long getTimestamp(QueryState state)
+    {
+        return wrapped.getTimestamp(state);
+    }
+
+    private static class WithoutPerStatementVariables extends BatchQueryOptions
+    {
+        private WithoutPerStatementVariables(QueryOptions wrapped, List<Object> queryOrIdList)
+        {
+            super(wrapped, queryOrIdList);
+        }
+
+        public QueryOptions forStatement(int i)
+        {
+            return wrapped;
+        }
+    }
+
+    private static class WithPerStatementVariables extends BatchQueryOptions
+    {
+        private final List<QueryOptions> perStatementOptions;
+
+        private WithPerStatementVariables(QueryOptions wrapped, List<List<ByteBuffer>> variables, List<Object> queryOrIdList)
+        {
+            super(wrapped, queryOrIdList);
+            this.perStatementOptions = new ArrayList<>(variables.size());
+            for (final List<ByteBuffer> vars : variables)
+            {
+                perStatementOptions.add(new QueryOptions.QueryOptionsWrapper(wrapped)
+                {
+                    public List<ByteBuffer> getValues()
+                    {
+                        return vars;
+                    }
+                });
+            }
+        }
+
+        public QueryOptions forStatement(int i)
+        {
+            return perStatementOptions.get(i);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/ColumnCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnCondition.java b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
index 67e7174..c2617fe 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnCondition.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
@@ -74,21 +74,21 @@ public class ColumnCondition
         value.collectMarkerSpecification(boundNames);
     }
 
-    public ColumnCondition.WithVariables with(List<ByteBuffer> variables)
+    public ColumnCondition.WithOptions with(QueryOptions options)
     {
-        return new WithVariables(variables);
+        return new WithOptions(options);
     }
 
-    public class WithVariables
+    public class WithOptions
     {
-        private final List<ByteBuffer> variables;
+        private final QueryOptions options;
 
-        private WithVariables(List<ByteBuffer> variables)
+        private WithOptions(QueryOptions options)
         {
-            this.variables = variables;
+            this.options = options;
         }
 
-        public boolean equalsTo(WithVariables other) throws InvalidRequestException
+        public boolean equalsTo(WithOptions other) throws InvalidRequestException
         {
             if (!column().equals(other.column()))
                 return false;
@@ -103,11 +103,11 @@ public class ColumnCondition
                                            ? Int32Type.instance
                                            : ((MapType)column.type).keys;
 
-                if (comparator.compare(collectionElement().bindAndGet(variables), other.collectionElement().bindAndGet(variables)) != 0)
+                if (comparator.compare(collectionElement().bindAndGet(options), other.collectionElement().bindAndGet(options)) != 0)
                     return false;
             }
 
-            return value().bindAndGet(variables).equals(other.value().bindAndGet(other.variables));
+            return value().bindAndGet(options).equals(other.value().bindAndGet(other.options));
         }
 
         private ColumnDefinition column()
@@ -127,7 +127,7 @@ public class ColumnCondition
 
         public ByteBuffer getCollectionElementValue() throws InvalidRequestException
         {
-            return collectionElement == null ? null : collectionElement.bindAndGet(variables);
+            return collectionElement == null ? null : collectionElement.bindAndGet(options);
         }
 
         /**
@@ -140,7 +140,7 @@ public class ColumnCondition
 
             assert collectionElement == null;
             Cell c = current.getColumn(current.metadata().comparator.create(rowPrefix, column));
-            ByteBuffer v = value.bindAndGet(variables);
+            ByteBuffer v = value.bindAndGet(options);
             return v == null
                  ? c == null || !c.isLive(now)
                  : c != null && c.isLive(now) && c.value().equals(v);
@@ -148,15 +148,15 @@ public class ColumnCondition
 
         private boolean collectionAppliesTo(CollectionType type, Composite rowPrefix, ColumnFamily current, final long now) throws InvalidRequestException
         {
-            Term.Terminal v = value.bind(variables);
+            Term.Terminal v = value.bind(options);
 
             // For map element access, we won't iterate over the collection, so deal with that first. In other case, we do.
             if (collectionElement != null && type instanceof MapType)
             {
-                ByteBuffer e = collectionElement.bindAndGet(variables);
+                ByteBuffer e = collectionElement.bindAndGet(options);
                 if (e == null)
                     throw new InvalidRequestException("Invalid null value for map access");
-                return mapElementAppliesTo((MapType)type, current, rowPrefix, e, v.get(), now);
+                return mapElementAppliesTo((MapType)type, current, rowPrefix, e, v.get(options), now);
             }
 
             CellName name = current.metadata().comparator.create(rowPrefix, column);
@@ -178,11 +178,11 @@ public class ColumnCondition
             if (collectionElement != null)
             {
                 assert type instanceof ListType;
-                ByteBuffer e = collectionElement.bindAndGet(variables);
+                ByteBuffer e = collectionElement.bindAndGet(options);
                 if (e == null)
                     throw new InvalidRequestException("Invalid null value for list access");
 
-                return listElementAppliesTo((ListType)type, iter, e, v.get());
+                return listElementAppliesTo((ListType)type, iter, e, v.get(options));
             }
 
             switch (type.kind)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index 3b7b4c4..5af84f0 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -53,7 +53,7 @@ public abstract class Constants
         private final Term.Terminal NULL_VALUE = new Value(null)
         {
             @Override
-            public Terminal bind(List<ByteBuffer> values)
+            public Terminal bind(QueryOptions options)
             {
                 // We return null because that makes life easier for collections
                 return null;
@@ -246,13 +246,13 @@ public abstract class Constants
             this.bytes = bytes;
         }
 
-        public ByteBuffer get()
+        public ByteBuffer get(QueryOptions options)
         {
             return bytes;
         }
 
         @Override
-        public ByteBuffer bindAndGet(List<ByteBuffer> values)
+        public ByteBuffer bindAndGet(QueryOptions options)
         {
             return bytes;
         }
@@ -267,11 +267,11 @@ public abstract class Constants
         }
 
         @Override
-        public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+        public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
         {
             try
             {
-                ByteBuffer value = values.get(bindIndex);
+                ByteBuffer value = options.getValues().get(bindIndex);
                 if (value != null)
                     receiver.type.validate(value);
                 return value;
@@ -282,9 +282,9 @@ public abstract class Constants
             }
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
-            ByteBuffer bytes = bindAndGet(values);
+            ByteBuffer bytes = bindAndGet(options);
             return bytes == null ? null : new Constants.Value(bytes);
         }
     }
@@ -299,7 +299,7 @@ public abstract class Constants
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
             CellName cname = cf.getComparator().create(prefix, column);
-            ByteBuffer value = t.bindAndGet(params.variables);
+            ByteBuffer value = t.bindAndGet(params.options);
             cf.addColumn(value == null ? params.makeTombstone(cname) : params.makeColumn(cname, value));
         }
     }
@@ -313,7 +313,7 @@ public abstract class Constants
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ByteBuffer bytes = t.bindAndGet(params.variables);
+            ByteBuffer bytes = t.bindAndGet(params.options);
             if (bytes == null)
                 throw new InvalidRequestException("Invalid null value for counter increment");
             long increment = ByteBufferUtil.toLong(bytes);
@@ -331,7 +331,7 @@ public abstract class Constants
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ByteBuffer bytes = t.bindAndGet(params.variables);
+            ByteBuffer bytes = t.bindAndGet(params.options);
             if (bytes == null)
                 throw new InvalidRequestException("Invalid null value for counter increment");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index 580a2c9..751ccdb 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -28,10 +28,10 @@ import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -83,7 +83,7 @@ public abstract class Lists
                 values.add(t);
             }
             DelayedValue value = new DelayedValue(values);
-            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+            return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
         }
 
         private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -128,13 +128,13 @@ public abstract class Lists
             this.elements = elements;
         }
 
-        public static Value fromSerialized(ByteBuffer value, ListType type) throws InvalidRequestException
+        public static Value fromSerialized(ByteBuffer value, ListType type, int version) throws InvalidRequestException
         {
             try
             {
                 // Collections have this small hack that validate cannot be called on a serialized object,
                 // but compose does the validation (so we're fine).
-                List<?> l = (List<?>)type.compose(value);
+                List<?> l = (List<?>)type.getSerializer().deserializeForNativeProtocol(value, version);
                 List<ByteBuffer> elements = new ArrayList<ByteBuffer>(l.size());
                 for (Object element : l)
                     elements.add(type.elements.decompose(element));
@@ -146,9 +146,9 @@ public abstract class Lists
             }
         }
 
-        public ByteBuffer get()
+        public ByteBuffer get(QueryOptions options)
         {
-            return CollectionType.pack(elements, elements.size());
+            return CollectionSerializer.pack(elements, elements.size(), options.getProtocolVersion());
         }
     }
 
@@ -180,12 +180,12 @@ public abstract class Lists
         {
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
             List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(elements.size());
             for (Term t : elements)
             {
-                ByteBuffer bytes = t.bindAndGet(values);
+                ByteBuffer bytes = t.bindAndGet(options);
 
                 if (bytes == null)
                     throw new InvalidRequestException("null is not supported inside collections");
@@ -210,10 +210,10 @@ public abstract class Lists
             assert receiver.type instanceof ListType;
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
-            ByteBuffer value = values.get(bindIndex);
-            return value == null ? null : Value.fromSerialized(value, (ListType)receiver.type);
+            ByteBuffer value = options.getValues().get(bindIndex);
+            return value == null ? null : Value.fromSerialized(value, (ListType)receiver.type, options.getProtocolVersion());
 
         }
     }
@@ -299,8 +299,8 @@ public abstract class Lists
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ByteBuffer index = idx.bindAndGet(params.variables);
-            ByteBuffer value = t.bindAndGet(params.variables);
+            ByteBuffer index = idx.bindAndGet(params.options);
+            ByteBuffer value = t.bindAndGet(params.options);
 
             if (index == null)
                 throw new InvalidRequestException("Invalid null value for list index");
@@ -342,7 +342,7 @@ public abstract class Lists
 
         static void doAppend(Term t, ColumnFamily cf, Composite prefix, ColumnDefinition column, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             // If we append null, do nothing. Note that for Setter, we've
             // already removed the previous value so we're good here too
             if (value == null)
@@ -367,7 +367,7 @@ public abstract class Lists
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             if (value == null)
                 return;
 
@@ -403,7 +403,7 @@ public abstract class Lists
             if (existingList.isEmpty())
                 return;
 
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             if (value == null)
                 return;
 
@@ -437,7 +437,7 @@ public abstract class Lists
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal index = t.bind(params.variables);
+            Term.Terminal index = t.bind(params.options);
             if (index == null)
                 throw new InvalidRequestException("Invalid null value for list index");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Maps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java
index d113b57..0c4980c 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -31,9 +31,9 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -86,7 +86,7 @@ public abstract class Maps
                 values.put(k, v);
             }
             DelayedValue value = new DelayedValue(((MapType)receiver.type).keys, values);
-            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+            return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
         }
 
         private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -142,13 +142,13 @@ public abstract class Maps
             this.map = map;
         }
 
-        public static Value fromSerialized(ByteBuffer value, MapType type) throws InvalidRequestException
+        public static Value fromSerialized(ByteBuffer value, MapType type, int version) throws InvalidRequestException
         {
             try
             {
                 // Collections have this small hack that validate cannot be called on a serialized object,
                 // but compose does the validation (so we're fine).
-                Map<?, ?> m = (Map<?, ?>)type.compose(value);
+                Map<?, ?> m = (Map<?, ?>)type.getSerializer().deserializeForNativeProtocol(value, version);
                 Map<ByteBuffer, ByteBuffer> map = new LinkedHashMap<ByteBuffer, ByteBuffer>(m.size());
                 for (Map.Entry<?, ?> entry : m.entrySet())
                     map.put(type.keys.decompose(entry.getKey()), type.values.decompose(entry.getValue()));
@@ -160,7 +160,7 @@ public abstract class Maps
             }
         }
 
-        public ByteBuffer get()
+        public ByteBuffer get(QueryOptions options)
         {
             List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(2 * map.size());
             for (Map.Entry<ByteBuffer, ByteBuffer> entry : map.entrySet())
@@ -168,7 +168,7 @@ public abstract class Maps
                 buffers.add(entry.getKey());
                 buffers.add(entry.getValue());
             }
-            return CollectionType.pack(buffers, map.size());
+            return CollectionSerializer.pack(buffers, map.size(), options.getProtocolVersion());
         }
     }
 
@@ -194,13 +194,13 @@ public abstract class Maps
         {
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
             Map<ByteBuffer, ByteBuffer> buffers = new TreeMap<ByteBuffer, ByteBuffer>(comparator);
             for (Map.Entry<Term, Term> entry : elements.entrySet())
             {
                 // We don't support values > 64K because the serialization format encode the length as an unsigned short.
-                ByteBuffer keyBytes = entry.getKey().bindAndGet(values);
+                ByteBuffer keyBytes = entry.getKey().bindAndGet(options);
                 if (keyBytes == null)
                     throw new InvalidRequestException("null is not supported inside collections");
                 if (keyBytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -208,7 +208,7 @@ public abstract class Maps
                                                                     FBUtilities.MAX_UNSIGNED_SHORT,
                                                                     keyBytes.remaining()));
 
-                ByteBuffer valueBytes = entry.getValue().bindAndGet(values);
+                ByteBuffer valueBytes = entry.getValue().bindAndGet(options);
                 if (valueBytes == null)
                     throw new InvalidRequestException("null is not supported inside collections");
                 if (valueBytes.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -230,10 +230,10 @@ public abstract class Maps
             assert receiver.type instanceof MapType;
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
-            ByteBuffer value = values.get(bindIndex);
-            return value == null ? null : Value.fromSerialized(value, (MapType)receiver.type);
+            ByteBuffer value = options.getValues().get(bindIndex);
+            return value == null ? null : Value.fromSerialized(value, (MapType)receiver.type, options.getProtocolVersion());
         }
     }
 
@@ -272,8 +272,8 @@ public abstract class Maps
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            ByteBuffer key = k.bindAndGet(params.variables);
-            ByteBuffer value = t.bindAndGet(params.variables);
+            ByteBuffer key = k.bindAndGet(params.options);
+            ByteBuffer value = t.bindAndGet(params.options);
             if (key == null)
                 throw new InvalidRequestException("Invalid null map key");
 
@@ -310,7 +310,7 @@ public abstract class Maps
 
         static void doPut(Term t, ColumnFamily cf, Composite prefix, ColumnDefinition column, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             if (value == null)
                 return;
             assert value instanceof Maps.Value;
@@ -333,7 +333,7 @@ public abstract class Maps
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal key = t.bind(params.variables);
+            Term.Terminal key = t.bind(params.options);
             if (key == null)
                 throw new InvalidRequestException("Invalid null map key");
             assert key instanceof Constants.Value;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/QueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java
index 4d72333..2f28812 100644
--- a/src/java/org/apache/cassandra/cql3/QueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3;
 
 import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.service.QueryState;
@@ -28,7 +29,7 @@ public interface QueryHandler
 {
     public ResultMessage process(String query, QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException;
     public ResultMessage.Prepared prepare(String query, QueryState state) throws RequestValidationException;
-    public CQLStatement getPrepared(MD5Digest id);
+    public ParsedStatement.Prepared getPrepared(MD5Digest id);
     public CQLStatement getPreparedForThrift(Integer id);
     public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException;
     public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException;


[5/5] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by sl...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b4d9803
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b4d9803
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b4d9803

Branch: refs/heads/trunk
Commit: 6b4d980357de573f9128c3a065a6d11c54a3b571
Parents: ad34247 9872b74
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Apr 30 20:22:53 2014 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Apr 30 20:22:53 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   2 +-
 doc/native_protocol_v3.spec                     | 911 +++++++++++++++++++
 src/java/org/apache/cassandra/auth/Auth.java    |  16 +-
 .../cassandra/auth/CassandraAuthorizer.java     |   6 +-
 .../cassandra/auth/PasswordAuthenticator.java   |   4 +-
 .../org/apache/cassandra/cql3/Attributes.java   |   8 +-
 .../cassandra/cql3/BatchQueryOptions.java       |  81 +-
 .../apache/cassandra/cql3/ColumnCondition.java  |  32 +-
 .../org/apache/cassandra/cql3/Constants.java    |  20 +-
 src/java/org/apache/cassandra/cql3/Lists.java   |  34 +-
 src/java/org/apache/cassandra/cql3/Maps.java    |  32 +-
 .../org/apache/cassandra/cql3/QueryHandler.java |   3 +-
 .../org/apache/cassandra/cql3/QueryOptions.java | 283 ++++--
 .../apache/cassandra/cql3/QueryProcessor.java   |  29 +-
 .../org/apache/cassandra/cql3/ResultSet.java    |   6 +-
 src/java/org/apache/cassandra/cql3/Sets.java    |  26 +-
 src/java/org/apache/cassandra/cql3/Term.java    |  18 +-
 .../apache/cassandra/cql3/UpdateParameters.java |   6 +-
 .../org/apache/cassandra/cql3/UserTypes.java    |  18 +-
 .../cassandra/cql3/functions/FunctionCall.java  |  20 +-
 .../cql3/statements/BatchStatement.java         |  95 +-
 .../cql3/statements/CQL3CasConditions.java      |  14 +-
 .../cql3/statements/ModificationStatement.java  |  63 +-
 .../cassandra/cql3/statements/Restriction.java  |  28 +-
 .../cql3/statements/SelectStatement.java        | 150 +--
 .../org/apache/cassandra/db/DefsTables.java     |  19 +-
 .../cassandra/db/marshal/CollectionType.java    |  29 +-
 .../apache/cassandra/db/marshal/ListType.java   |  12 +-
 .../apache/cassandra/db/marshal/MapType.java    |  21 +-
 .../apache/cassandra/db/marshal/SetType.java    |  15 +-
 .../apache/cassandra/db/marshal/UserType.java   |   5 +
 .../hadoop/pig/AbstractCassandraStorage.java    |  11 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  |  11 +-
 .../serializers/CollectionSerializer.java       | 106 ++-
 .../cassandra/serializers/ListSerializer.java   |  39 +-
 .../cassandra/serializers/MapSerializer.java    |  48 +-
 .../cassandra/serializers/SetSerializer.java    |  39 +-
 .../cassandra/service/IMigrationListener.java   |   3 +
 .../cassandra/service/MigrationManager.java     |  18 +
 .../cassandra/thrift/CassandraServer.java       |   4 +-
 .../org/apache/cassandra/transport/CBUtil.java  |  17 +
 .../org/apache/cassandra/transport/Client.java  |   4 +-
 .../apache/cassandra/transport/DataType.java    |  79 +-
 .../org/apache/cassandra/transport/Event.java   | 158 +++-
 .../apache/cassandra/transport/OptionCodec.java |  28 +-
 .../org/apache/cassandra/transport/Server.java  |  21 +-
 .../cassandra/transport/SimpleClient.java       |   4 +-
 .../transport/messages/BatchMessage.java        |  53 +-
 .../transport/messages/EventMessage.java        |   6 +-
 .../transport/messages/ExecuteMessage.java      |   5 +-
 .../cassandra/transport/SerDeserTest.java       | 217 +++++
 52 files changed, 2232 insertions(+), 646 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b4d9803/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b4d9803/build.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b4d9803/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index db99060,40c45af..9a5ac92
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -51,13 -51,14 +51,13 @@@ public class QueryProcessor implements 
      private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class);
      private static final MemoryMeter meter = new MemoryMeter().withGuessing(MemoryMeter.Guess.FALLBACK_BEST);
      private static final long MAX_CACHE_PREPARED_MEMORY = Runtime.getRuntime().maxMemory() / 256;
 -    private static final int MAX_CACHE_PREPARED_COUNT = 10000;
  
-     private static EntryWeigher<MD5Digest, CQLStatement> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, CQLStatement>()
+     private static EntryWeigher<MD5Digest, ParsedStatement.Prepared> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, ParsedStatement.Prepared>()
      {
          @Override
-         public int weightOf(MD5Digest key, CQLStatement value)
+         public int weightOf(MD5Digest key, ParsedStatement.Prepared value)
          {
-             return Ints.checkedCast(measure(key) + measure(value));
+             return Ints.checkedCast(measure(key) + measure(value.statement) + measure(value.boundNames));
          }
      };
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b4d9803/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b4d9803/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------


[3/5] Native protocol v3

Posted by sl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 76b1eeb..12accaf 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
@@ -25,120 +26,244 @@ import java.util.List;
 import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.transport.CBCodec;
 import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Options for a query.
  */
-public class QueryOptions
+public abstract class QueryOptions
 {
-    public static final QueryOptions DEFAULT = new QueryOptions(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList());
+    public static final QueryOptions DEFAULT = new DefaultQueryOptions(ConsistencyLevel.ONE,
+                                                                       Collections.<ByteBuffer>emptyList(),
+                                                                       false,
+                                                                       SpecificOptions.DEFAULT,
+                                                                       3);
 
     public static final CBCodec<QueryOptions> codec = new Codec();
 
-    private final ConsistencyLevel consistency;
-    private final List<ByteBuffer> values;
-    private final boolean skipMetadata;
-
-    private final SpecificOptions options;
+    public static QueryOptions fromProtocolV1(ConsistencyLevel consistency, List<ByteBuffer> values)
+    {
+        return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 1);
+    }
 
-    // The protocol version of incoming queries. This is set during deserializaion and will be 0
-    // if the QueryOptions does not come from a user message (or come from thrift).
-    private final transient int protocolVersion;
+    public static QueryOptions fromProtocolV2(ConsistencyLevel consistency, List<ByteBuffer> values)
+    {
+        return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 2);
+    }
 
-    public QueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values)
+    public static QueryOptions forInternalCalls(ConsistencyLevel consistency, List<ByteBuffer> values)
     {
-        this(consistency, values, false, SpecificOptions.DEFAULT, 0);
+        return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 0);
     }
 
-    public QueryOptions(ConsistencyLevel consistency,
-                        List<ByteBuffer> values,
-                        boolean skipMetadata,
-                        int pageSize,
-                        PagingState pagingState,
-                        ConsistencyLevel serialConsistency)
+    public static QueryOptions fromPreV3Batch(ConsistencyLevel consistency)
     {
-        this(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency), 0);
+        return new DefaultQueryOptions(consistency, Collections.<ByteBuffer>emptyList(), false, SpecificOptions.DEFAULT, 2);
     }
 
-    private QueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, int protocolVersion)
+    public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency)
     {
-        this.consistency = consistency;
-        this.values = values;
-        this.skipMetadata = skipMetadata;
-        this.options = options;
-        this.protocolVersion = protocolVersion;
+        return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L), 0);
     }
 
-    public static QueryOptions fromProtocolV1(ConsistencyLevel consistency, List<ByteBuffer> values)
+    public abstract ConsistencyLevel getConsistency();
+    public abstract List<ByteBuffer> getValues();
+    public abstract boolean skipMetadata();
+
+    /**  The pageSize for this query. Will be <= 0 if not relevant for the query.  */
+    public int getPageSize()
     {
-        return new QueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 1);
+        return getSpecificOptions().pageSize;
     }
 
-    public ConsistencyLevel getConsistency()
+    /** The paging state for this query, or null if not relevant. */
+    public PagingState getPagingState()
     {
-        return consistency;
+        return getSpecificOptions().state;
     }
 
-    public List<ByteBuffer> getValues()
+    /**  Serial consistency for conditional updates. */
+    public ConsistencyLevel getSerialConsistency()
     {
-        return values;
+        return getSpecificOptions().serialConsistency;
     }
 
-    public boolean skipMetadata()
+    public long getTimestamp(QueryState state)
     {
-        return skipMetadata;
+        long tstamp = getSpecificOptions().timestamp;
+        return tstamp >= 0 ? tstamp : state.getTimestamp();
     }
 
     /**
-     * The pageSize for this query. Will be <= 0 if not relevant for the query.
+     * The protocol version for the query. Will be 3 if the object don't come from
+     * a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
      */
-    public int getPageSize()
+    public abstract int getProtocolVersion();
+
+    public abstract QueryOptions withProtocolVersion(int version);
+
+    // Mainly for the sake of BatchQueryOptions
+    abstract SpecificOptions getSpecificOptions();
+
+    public QueryOptions prepare(List<ColumnSpecification> specs)
     {
-        return options.pageSize;
+        return this;
     }
 
-    /**
-     * The paging state for this query, or null if not relevant.
-     */
-    public PagingState getPagingState()
+    static class DefaultQueryOptions extends QueryOptions
     {
-        return options.state;
+        private final ConsistencyLevel consistency;
+        private final List<ByteBuffer> values;
+        private final boolean skipMetadata;
+
+        private final SpecificOptions options;
+
+        // The protocol version of incoming queries. This is set during deserializaion and will be 0
+        // if the QueryOptions does not come from a user message (or come from thrift).
+        private final transient int protocolVersion;
+
+        DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, int protocolVersion)
+        {
+            this.consistency = consistency;
+            this.values = values;
+            this.skipMetadata = skipMetadata;
+            this.options = options;
+            this.protocolVersion = protocolVersion;
+        }
+
+        public QueryOptions withProtocolVersion(int version)
+        {
+            return new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
+        }
+
+        public ConsistencyLevel getConsistency()
+        {
+            return consistency;
+        }
+
+        public List<ByteBuffer> getValues()
+        {
+            return values;
+        }
+
+        public boolean skipMetadata()
+        {
+            return skipMetadata;
+        }
+
+        public int getProtocolVersion()
+        {
+            return protocolVersion;
+        }
+
+        SpecificOptions getSpecificOptions()
+        {
+            return options;
+        }
     }
 
-    /**
-     * Serial consistency for conditional updates.
-     */
-    public ConsistencyLevel getSerialConsistency()
+    static abstract class QueryOptionsWrapper extends QueryOptions
     {
-        return options.serialConsistency;
+        protected final QueryOptions wrapped;
+
+        QueryOptionsWrapper(QueryOptions wrapped)
+        {
+            this.wrapped = wrapped;
+        }
+
+        public ConsistencyLevel getConsistency()
+        {
+            return wrapped.getConsistency();
+        }
+
+        public boolean skipMetadata()
+        {
+            return wrapped.skipMetadata();
+        }
+
+        public int getProtocolVersion()
+        {
+            return wrapped.getProtocolVersion();
+        }
+
+        SpecificOptions getSpecificOptions()
+        {
+            return wrapped.getSpecificOptions();
+        }
+
+        @Override
+        public QueryOptions prepare(List<ColumnSpecification> specs)
+        {
+            wrapped.prepare(specs);
+            return this;
+        }
+
+        public QueryOptions withProtocolVersion(int version)
+        {
+            return new DefaultQueryOptions(getConsistency(), getValues(), skipMetadata(),  getSpecificOptions(), version);
+        }
     }
 
-    /**
-     * The protocol version for the query. Will be 0 if the object don't come from
-     * a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
-     */
-    public int getProtocolVersion()
+    static class OptionsWithNames extends QueryOptionsWrapper
     {
-        return protocolVersion;
+        private final List<String> names;
+        private List<ByteBuffer> orderedValues;
+
+        OptionsWithNames(DefaultQueryOptions wrapped, List<String> names)
+        {
+            super(wrapped);
+            this.names = names;
+        }
+
+        @Override
+        public QueryOptions prepare(List<ColumnSpecification> specs)
+        {
+            super.prepare(specs);
+
+            orderedValues = new ArrayList<ByteBuffer>(specs.size());
+            for (int i = 0; i < specs.size(); i++)
+            {
+                String name = specs.get(i).name.toString();
+                for (int j = 0; j < names.size(); j++)
+                {
+                    if (name.equals(names.get(j)))
+                    {
+                        orderedValues.add(wrapped.getValues().get(j));
+                        break;
+                    }
+                }
+            }
+            return this;
+        }
+
+        public List<ByteBuffer> getValues()
+        {
+            assert orderedValues != null; // We should have called prepare first!
+            return orderedValues;
+        }
     }
 
     // Options that are likely to not be present in most queries
-    private static class SpecificOptions
+    static class SpecificOptions
     {
-        private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null);
+        private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, -1L);
 
         private final int pageSize;
         private final PagingState state;
         private final ConsistencyLevel serialConsistency;
+        private final long timestamp;
 
-        private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency)
+        private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp)
         {
             this.pageSize = pageSize;
             this.state = state;
             this.serialConsistency = serialConsistency == null ? ConsistencyLevel.SERIAL : serialConsistency;
+            this.timestamp = timestamp;
         }
     }
 
@@ -151,7 +276,9 @@ public class QueryOptions
             SKIP_METADATA,
             PAGE_SIZE,
             PAGING_STATE,
-            SERIAL_CONSISTENCY;
+            SERIAL_CONSISTENCY,
+            TIMESTAMP,
+            NAMES_FOR_VALUES;
 
             public static EnumSet<Flag> deserialize(int flags)
             {
@@ -181,9 +308,21 @@ public class QueryOptions
             ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
             EnumSet<Flag> flags = Flag.deserialize((int)body.readByte());
 
-            List<ByteBuffer> values = flags.contains(Flag.VALUES)
-                                    ? CBUtil.readValueList(body)
-                                    : Collections.<ByteBuffer>emptyList();
+            List<ByteBuffer> values = Collections.<ByteBuffer>emptyList();
+            List<String> names = null;
+            if (flags.contains(Flag.VALUES))
+            {
+                if (flags.contains(Flag.NAMES_FOR_VALUES))
+                {
+                    Pair<List<String>, List<ByteBuffer>> namesAndValues = CBUtil.readNameAndValueList(body);
+                    names = namesAndValues.left;
+                    values = namesAndValues.right;
+                }
+                else
+                {
+                    values = CBUtil.readValueList(body);
+                }
+            }
 
             boolean skipMetadata = flags.contains(Flag.SKIP_METADATA);
             flags.remove(Flag.VALUES);
@@ -195,9 +334,19 @@ public class QueryOptions
                 int pageSize = flags.contains(Flag.PAGE_SIZE) ? body.readInt() : -1;
                 PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValue(body)) : null;
                 ConsistencyLevel serialConsistency = flags.contains(Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : ConsistencyLevel.SERIAL;
-                options = new SpecificOptions(pageSize, pagingState, serialConsistency);
+                long timestamp = -1L;
+                if (flags.contains(Flag.TIMESTAMP))
+                {
+                    long ts = body.readLong();
+                    if (ts < 0)
+                        throw new ProtocolException("Invalid negative (" + ts + ") protocol level timestamp");
+                    timestamp = ts;
+                }
+
+                options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp);
             }
-            return new QueryOptions(consistency, values, skipMetadata, options, version);
+            DefaultQueryOptions opts = new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
+            return names == null ? opts : new OptionsWithNames(opts, names);
         }
 
         public void encode(QueryOptions options, ByteBuf dest, int version)
@@ -217,6 +366,12 @@ public class QueryOptions
                 CBUtil.writeValue(options.getPagingState().serialize(), dest);
             if (flags.contains(Flag.SERIAL_CONSISTENCY))
                 CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest);
+            if (flags.contains(Flag.TIMESTAMP))
+                dest.writeLong(options.getSpecificOptions().timestamp);
+
+            // Note that we don't really have to bother with NAMES_FOR_VALUES server side,
+            // and in fact we never really encode QueryOptions, only decode them, so we
+            // don't bother.
         }
 
         public int encodedSize(QueryOptions options, int version)
@@ -236,6 +391,8 @@ public class QueryOptions
                 size += CBUtil.sizeOfValue(options.getPagingState().serialize());
             if (flags.contains(Flag.SERIAL_CONSISTENCY))
                 size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
+            if (flags.contains(Flag.TIMESTAMP))
+                size += 8;
 
             return size;
         }
@@ -245,7 +402,7 @@ public class QueryOptions
             EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
             if (options.getValues().size() > 0)
                 flags.add(Flag.VALUES);
-            if (options.skipMetadata)
+            if (options.skipMetadata())
                 flags.add(Flag.SKIP_METADATA);
             if (options.getPageSize() >= 0)
                 flags.add(Flag.PAGE_SIZE);
@@ -253,6 +410,8 @@ public class QueryOptions
                 flags.add(Flag.PAGING_STATE);
             if (options.getSerialConsistency() != ConsistencyLevel.SERIAL)
                 flags.add(Flag.SERIAL_CONSISTENCY);
+            if (options.getSpecificOptions().timestamp >= 0)
+                flags.add(Flag.TIMESTAMP);
             return flags;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index e8cee15..40c45af 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -53,12 +53,12 @@ public class QueryProcessor implements QueryHandler
     private static final long MAX_CACHE_PREPARED_MEMORY = Runtime.getRuntime().maxMemory() / 256;
     private static final int MAX_CACHE_PREPARED_COUNT = 10000;
 
-    private static EntryWeigher<MD5Digest, CQLStatement> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, CQLStatement>()
+    private static EntryWeigher<MD5Digest, ParsedStatement.Prepared> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, ParsedStatement.Prepared>()
     {
         @Override
-        public int weightOf(MD5Digest key, CQLStatement value)
+        public int weightOf(MD5Digest key, ParsedStatement.Prepared value)
         {
-            return Ints.checkedCast(measure(key) + measure(value));
+            return Ints.checkedCast(measure(key) + measure(value.statement) + measure(value.boundNames));
         }
     };
 
@@ -71,12 +71,12 @@ public class QueryProcessor implements QueryHandler
         }
     };
 
-    private static final ConcurrentLinkedHashMap<MD5Digest, CQLStatement> preparedStatements;
+    private static final ConcurrentLinkedHashMap<MD5Digest, ParsedStatement.Prepared> preparedStatements;
     private static final ConcurrentLinkedHashMap<Integer, CQLStatement> thriftPreparedStatements;
 
     static
     {
-        preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, CQLStatement>()
+        preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, ParsedStatement.Prepared>()
                              .maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY)
                              .weigher(cqlMemoryUsageWeigher)
                              .build();
@@ -90,7 +90,7 @@ public class QueryProcessor implements QueryHandler
     {
     }
 
-    public CQLStatement getPrepared(MD5Digest id)
+    public ParsedStatement.Prepared getPrepared(MD5Digest id)
     {
         return preparedStatements.get(id);
     }
@@ -154,29 +154,31 @@ public class QueryProcessor implements QueryHandler
     public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
     throws RequestExecutionException, RequestValidationException
     {
-        return instance.process(queryString, queryState, new QueryOptions(cl, Collections.<ByteBuffer>emptyList()));
+        return instance.process(queryString, queryState, QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()));
     }
 
     public ResultMessage process(String queryString, QueryState queryState, QueryOptions options)
     throws RequestExecutionException, RequestValidationException
     {
-        CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
+        ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());
+        options.prepare(p.boundNames);
+        CQLStatement prepared = p.statement;
         if (prepared.getBoundTerms() != options.getValues().size())
             throw new InvalidRequestException("Invalid amount of bind variables");
 
         return processStatement(prepared, queryState, options);
     }
 
-    public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
+    public static ParsedStatement.Prepared parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
     {
-        return getStatement(queryStr, queryState.getClientState()).statement;
+        return getStatement(queryStr, queryState.getClientState());
     }
 
     public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException
     {
         try
         {
-            ResultMessage result = instance.process(query, QueryState.forInternalCalls(), new QueryOptions(cl, Collections.<ByteBuffer>emptyList()));
+            ResultMessage result = instance.process(query, QueryState.forInternalCalls(), QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()));
             if (result instanceof ResultMessage.Rows)
                 return UntypedResultSet.create(((ResultMessage.Rows)result).result);
             else
@@ -276,7 +278,7 @@ public class QueryProcessor implements QueryHandler
         else
         {
             MD5Digest statementId = MD5Digest.compute(toHash);
-            preparedStatements.put(statementId, prepared.statement);
+            preparedStatements.put(statementId, prepared);
             logger.trace(String.format("Stored prepared statement %s with %d bind markers",
                                        statementId,
                                        prepared.statement.getBoundTerms()));
@@ -312,8 +314,7 @@ public class QueryProcessor implements QueryHandler
         ClientState clientState = queryState.getClientState();
         batch.checkAccess(clientState);
         batch.validate(clientState);
-
-        batch.executeWithPerStatementVariables(options.getConsistency(), queryState, options.getValues());
+        batch.execute(queryState, options);
         return new ResultMessage.Void();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 53ba380..eea0475 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -373,7 +373,7 @@ public class ResultSet
                     String ksName = globalTablesSpec ? globalKsName : CBUtil.readString(body);
                     String cfName = globalTablesSpec ? globalCfName : CBUtil.readString(body);
                     ColumnIdentifier colName = new ColumnIdentifier(CBUtil.readString(body), true);
-                    AbstractType type = DataType.toType(DataType.codec.decodeOne(body));
+                    AbstractType type = DataType.toType(DataType.codec.decodeOne(body, version));
                     names.add(new ColumnSpecification(ksName, cfName, colName, type));
                 }
                 return new Metadata(flags, names).setHasMorePages(state);
@@ -410,7 +410,7 @@ public class ResultSet
                             CBUtil.writeString(name.cfName, dest);
                         }
                         CBUtil.writeString(name.name.toString(), dest);
-                        DataType.codec.writeOne(DataType.fromType(name.type), dest);
+                        DataType.codec.writeOne(DataType.fromType(name.type, version), dest, version);
                     }
                 }
             }
@@ -442,7 +442,7 @@ public class ResultSet
                             size += CBUtil.sizeOfString(name.cfName);
                         }
                         size += CBUtil.sizeOfString(name.name.toString());
-                        size += DataType.codec.oneSerializedSize(DataType.fromType(name.type));
+                        size += DataType.codec.oneSerializedSize(DataType.fromType(name.type, version), version);
                     }
                 }
                 return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Sets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index e48a3ce..92a3510 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -33,10 +33,10 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -88,7 +88,7 @@ public abstract class Sets
                 values.add(t);
             }
             DelayedValue value = new DelayedValue(((SetType)receiver.type).elements, values);
-            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+            return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
         }
 
         private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -140,13 +140,13 @@ public abstract class Sets
             this.elements = elements;
         }
 
-        public static Value fromSerialized(ByteBuffer value, SetType type) throws InvalidRequestException
+        public static Value fromSerialized(ByteBuffer value, SetType type, int version) throws InvalidRequestException
         {
             try
             {
                 // Collections have this small hack that validate cannot be called on a serialized object,
                 // but compose does the validation (so we're fine).
-                Set<?> s = (Set<?>)type.compose(value);
+                Set<?> s = (Set<?>)type.getSerializer().deserializeForNativeProtocol(value, version);
                 Set<ByteBuffer> elements = new LinkedHashSet<ByteBuffer>(s.size());
                 for (Object element : s)
                     elements.add(type.elements.decompose(element));
@@ -158,9 +158,9 @@ public abstract class Sets
             }
         }
 
-        public ByteBuffer get()
+        public ByteBuffer get(QueryOptions options)
         {
-            return CollectionType.pack(new ArrayList<ByteBuffer>(elements), elements.size());
+            return CollectionSerializer.pack(new ArrayList<ByteBuffer>(elements), elements.size(), options.getProtocolVersion());
         }
     }
 
@@ -186,12 +186,12 @@ public abstract class Sets
         {
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
             Set<ByteBuffer> buffers = new TreeSet<ByteBuffer>(comparator);
             for (Term t : elements)
             {
-                ByteBuffer bytes = t.bindAndGet(values);
+                ByteBuffer bytes = t.bindAndGet(options);
 
                 if (bytes == null)
                     throw new InvalidRequestException("null is not supported inside collections");
@@ -216,10 +216,10 @@ public abstract class Sets
             assert receiver.type instanceof SetType;
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
-            ByteBuffer value = values.get(bindIndex);
-            return value == null ? null : Value.fromSerialized(value, (SetType)receiver.type);
+            ByteBuffer value = options.getValues().get(bindIndex);
+            return value == null ? null : Value.fromSerialized(value, (SetType)receiver.type, options.getProtocolVersion());
         }
     }
 
@@ -253,7 +253,7 @@ public abstract class Sets
 
         static void doAdd(Term t, ColumnFamily cf, Composite prefix, ColumnDefinition column, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             if (value == null)
                 return;
 
@@ -277,7 +277,7 @@ public abstract class Sets
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             if (value == null)
                 return;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index d539ecf..481514f 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -48,7 +48,7 @@ public interface Term
      * @return the result of binding all the variables of this NonTerminal (or
      * 'this' if the term is terminal).
      */
-    public Terminal bind(List<ByteBuffer> values) throws InvalidRequestException;
+    public Terminal bind(QueryOptions options) throws InvalidRequestException;
 
     /**
      * A shorter for bind(values).get().
@@ -56,7 +56,7 @@ public interface Term
      * object between the bind and the get (note that we still want to be able
      * to separate bind and get for collections).
      */
-    public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException;
+    public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException;
 
     /**
      * Whether or not that term contains at least one bind marker.
@@ -108,7 +108,7 @@ public interface Term
     public abstract class Terminal implements Term
     {
         public void collectMarkerSpecification(VariableSpecifications boundNames) {}
-        public Terminal bind(List<ByteBuffer> values) { return this; }
+        public Terminal bind(QueryOptions options) { return this; }
 
         // While some NonTerminal may not have bind markers, no Term can be Terminal
         // with a bind marker
@@ -120,11 +120,11 @@ public interface Term
         /**
          * @return the serialized value of this terminal.
          */
-        public abstract ByteBuffer get();
+        public abstract ByteBuffer get(QueryOptions options);
 
-        public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+        public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
         {
-            return get();
+            return get(options);
         }
     }
 
@@ -140,10 +140,10 @@ public interface Term
      */
     public abstract class NonTerminal implements Term
     {
-        public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+        public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
         {
-            Terminal t = bind(values);
-            return t == null ? null : t.get();
+            Terminal t = bind(options);
+            return t == null ? null : t.get(options);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index fad8fae..8a47536 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 public class UpdateParameters
 {
     public final CFMetaData metadata;
-    public final List<ByteBuffer> variables;
+    public final QueryOptions options;
     public final long timestamp;
     private final int ttl;
     public final int localDeletionTime;
@@ -42,10 +42,10 @@ public class UpdateParameters
     // For lists operation that require a read-before-write. Will be null otherwise.
     private final Map<ByteBuffer, CQL3Row> prefetchedLists;
 
-    public UpdateParameters(CFMetaData metadata, List<ByteBuffer> variables, long timestamp, int ttl, Map<ByteBuffer, CQL3Row> prefetchedLists)
+    public UpdateParameters(CFMetaData metadata, QueryOptions options, long timestamp, int ttl, Map<ByteBuffer, CQL3Row> prefetchedLists)
     {
         this.metadata = metadata;
-        this.variables = variables;
+        this.options = options;
         this.timestamp = timestamp;
         this.ttl = ttl;
         this.localDeletionTime = (int)(System.currentTimeMillis() / 1000);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/UserTypes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UserTypes.java b/src/java/org/apache/cassandra/cql3/UserTypes.java
index 2fd1a0f..2faa960 100644
--- a/src/java/org/apache/cassandra/cql3/UserTypes.java
+++ b/src/java/org/apache/cassandra/cql3/UserTypes.java
@@ -65,7 +65,7 @@ public abstract class UserTypes
                 values.add(value);
             }
             DelayedValue value = new DelayedValue(((UserType)receiver.type), values);
-            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+            return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
         }
 
         private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -144,12 +144,16 @@ public abstract class UserTypes
                 values.get(i).collectMarkerSpecification(boundNames);
         }
 
-        private ByteBuffer[] bindInternal(List<ByteBuffer> variables) throws InvalidRequestException
+        private ByteBuffer[] bindInternal(QueryOptions options) throws InvalidRequestException
         {
+            // Inside UDT values, we must force the serialization of collections whatever the protocol version is in
+            // use since we're going to store directly that serialized value.
+            options = options.withProtocolVersion(3);
+
             ByteBuffer[] buffers = new ByteBuffer[values.size()];
             for (int i = 0; i < type.types.size(); i++)
             {
-                ByteBuffer buffer = values.get(i).bindAndGet(variables);
+                ByteBuffer buffer = values.get(i).bindAndGet(options);
                 if (buffer == null)
                     throw new InvalidRequestException("null is not supported inside user type literals");
                 if (buffer.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -163,15 +167,15 @@ public abstract class UserTypes
             return buffers;
         }
 
-        public Constants.Value bind(List<ByteBuffer> variables) throws InvalidRequestException
+        public Constants.Value bind(QueryOptions options) throws InvalidRequestException
         {
-            return new Constants.Value(bindAndGet(variables));
+            return new Constants.Value(bindAndGet(options));
         }
 
         @Override
-        public ByteBuffer bindAndGet(List<ByteBuffer> variables) throws InvalidRequestException
+        public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
         {
-            return CompositeType.build(bindInternal(variables));
+            return CompositeType.build(bindInternal(options));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index 083543a..f99a2e4 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -46,19 +46,19 @@ public class FunctionCall extends Term.NonTerminal
             t.collectMarkerSpecification(boundNames);
     }
 
-    public Term.Terminal bind(List<ByteBuffer> values) throws InvalidRequestException
+    public Term.Terminal bind(QueryOptions options) throws InvalidRequestException
     {
-        return makeTerminal(fun, bindAndGet(values));
+        return makeTerminal(fun, bindAndGet(options), options.getProtocolVersion());
     }
 
-    public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+    public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
     {
         List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(terms.size());
         for (Term t : terms)
         {
             // For now, we don't allow nulls as argument as no existing function needs it and it
             // simplify things.
-            ByteBuffer val = t.bindAndGet(values);
+            ByteBuffer val = t.bindAndGet(options);
             if (val == null)
                 throw new InvalidRequestException(String.format("Invalid null value for argument to %s", fun));
             buffers.add(val);
@@ -77,16 +77,16 @@ public class FunctionCall extends Term.NonTerminal
         return false;
     }
 
-    private static Term.Terminal makeTerminal(Function fun, ByteBuffer result) throws InvalidRequestException
+    private static Term.Terminal makeTerminal(Function fun, ByteBuffer result, int version) throws InvalidRequestException
     {
         if (!(fun.returnType() instanceof CollectionType))
             return new Constants.Value(result);
 
         switch (((CollectionType)fun.returnType()).kind)
         {
-            case LIST: return Lists.Value.fromSerialized(result, (ListType)fun.returnType());
-            case SET:  return Sets.Value.fromSerialized(result, (SetType)fun.returnType());
-            case MAP:  return Maps.Value.fromSerialized(result, (MapType)fun.returnType());
+            case LIST: return Lists.Value.fromSerialized(result, (ListType)fun.returnType(), version);
+            case SET:  return Sets.Value.fromSerialized(result, (SetType)fun.returnType(), version);
+            case MAP:  return Maps.Value.fromSerialized(result, (MapType)fun.returnType(), version);
         }
         throw new AssertionError();
     }
@@ -119,7 +119,7 @@ public class FunctionCall extends Term.NonTerminal
             // If all parameters are terminal and the function is pure, we can
             // evaluate it now, otherwise we'd have to wait execution time
             return allTerminal && fun.isPure()
-                ? makeTerminal(fun, execute(fun, parameters))
+                ? makeTerminal(fun, execute(fun, parameters), QueryOptions.DEFAULT.getProtocolVersion())
                 : new FunctionCall(fun, parameters);
         }
 
@@ -130,7 +130,7 @@ public class FunctionCall extends Term.NonTerminal
             for (Term t : parameters)
             {
                 assert t instanceof Term.Terminal;
-                buffers.add(((Term.Terminal)t).get());
+                buffers.add(((Term.Terminal)t).get(QueryOptions.DEFAULT));
             }
             return fun.execute(buffers);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 88bb644..95d504d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -116,16 +116,16 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         return statements;
     }
 
-    private Collection<? extends IMutation> getMutations(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
+    private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now)
     throws RequestExecutionException, RequestValidationException
     {
         Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
-            List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
-            long timestamp = attrs.getTimestamp(now, statementVariables);
-            addStatementMutations(statement, statementVariables, local, cl, timestamp, mutations);
+            QueryOptions statementOptions = options.forStatement(i);
+            long timestamp = attrs.getTimestamp(now, statementOptions);
+            addStatementMutations(statement, statementOptions, local, timestamp, mutations);
         }
         return unzipMutations(mutations);
     }
@@ -143,9 +143,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     }
 
     private void addStatementMutations(ModificationStatement statement,
-                                       List<ByteBuffer> variables,
+                                       QueryOptions options,
                                        boolean local,
-                                       ConsistencyLevel cl,
                                        long now,
                                        Map<String, Map<ByteBuffer, IMutation>> mutations)
     throws RequestExecutionException, RequestValidationException
@@ -161,9 +160,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         // The following does the same than statement.getMutations(), but we inline it here because
         // we don't want to recreate mutations every time as this is particularly inefficient when applying
         // multiple batch to the same partition (see #6737).
-        List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables);
-        Composite clusteringPrefix = statement.createClusteringPrefix(variables);
-        UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+        List<ByteBuffer> keys = statement.buildPartitionKeyNames(options);
+        Composite clusteringPrefix = statement.createClusteringPrefix(options);
+        UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, options, local, now);
 
         for (ByteBuffer key : keys)
         {
@@ -172,7 +171,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             if (mutation == null)
             {
                 mut = new Mutation(ksName, key);
-                mutation = type == Type.COUNTER ? new CounterMutation(mut, cl) : mut;
+                mutation = type == Type.COUNTER ? new CounterMutation(mut, options.getConsistency()) : mut;
                 ksMap.put(key, mutation);
             }
             else
@@ -209,29 +208,26 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
 
     public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
-        if (options.getConsistency() == null)
-            throw new InvalidRequestException("Invalid empty consistency level");
-
-        return execute(new PreparedBatchVariables(options.getValues()), false, options.getConsistency(), options.getSerialConsistency(), queryState.getTimestamp());
+        return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options));
     }
 
-    public ResultMessage executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
+    public ResultMessage execute(QueryState queryState, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException
     {
-        if (cl == null)
-            throw new InvalidRequestException("Invalid empty consistency level");
-
-        return execute(new BatchOfPreparedVariables(variables), false, cl, ConsistencyLevel.SERIAL, queryState.getTimestamp());
+        return execute(options, false, options.getTimestamp(queryState));
     }
 
-    public ResultMessage execute(BatchVariables variables, boolean local, ConsistencyLevel cl, ConsistencyLevel serialCl, long now)
+    public ResultMessage execute(BatchQueryOptions options, boolean local, long now)
     throws RequestExecutionException, RequestValidationException
     {
-        // TODO: we don't support a serial consistency for batches in the protocol so defaulting to SERIAL for now.
-        // We'll need to fix that.
+        if (options.getConsistency() == null)
+            throw new InvalidRequestException("Invalid empty consistency level");
+        if (options.getSerialConsistency() == null)
+            throw new InvalidRequestException("Invalid empty serial consistency level");
+
         if (hasConditions)
-            return executeWithConditions(variables, cl, serialCl, now);
+            return executeWithConditions(options, now);
 
-        executeWithoutConditions(getMutations(variables, local, cl, now), cl);
+        executeWithoutConditions(getMutations(options, local, now), options.getConsistency());
         return null;
     }
 
@@ -251,8 +247,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
     }
 
-
-    private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
+    private ResultMessage executeWithConditions(BatchQueryOptions options, long now)
     throws RequestExecutionException, RequestValidationException
     {
         ByteBuffer key = null;
@@ -265,9 +260,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
-            List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
-            long timestamp = attrs.getTimestamp(now, statementVariables);
-            List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementVariables);
+            QueryOptions statementOptions = options.forStatement(i);
+            long timestamp = attrs.getTimestamp(now, statementOptions);
+            List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementOptions);
             if (pks.size() > 1)
                 throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)");
             if (key == null)
@@ -283,10 +278,10 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
                 throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
             }
 
-            Composite clusteringPrefix = statement.createClusteringPrefix(statementVariables);
+            Composite clusteringPrefix = statement.createClusteringPrefix(statementOptions);
             if (statement.hasConditions())
             {
-                statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
+                statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementOptions, timestamp);
                 // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
                 if (statement.hasIfNotExistCondition() || statement.hasIfExistCondition())
                     columnsWithConditions = null;
@@ -295,20 +290,20 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             }
             else
             {
-                UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementVariables, false, cl, now);
+                UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementOptions, false, now);
                 statement.addUpdateForKey(updates, key, clusteringPrefix, params);
             }
         }
 
         verifyBatchSize(Collections.singleton(updates));
-        ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
+        ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, options.getSerialConsistency(), options.getConsistency());
         return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
     }
 
     public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
     {
         assert !hasConditions;
-        for (IMutation mutation : getMutations(PreparedBatchVariables.EMPTY, true, null, queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(BatchQueryOptions.DEFAULT, true, queryState.getTimestamp()))
         {
             // We don't use counters internally.
             assert mutation instanceof Mutation;
@@ -322,38 +317,6 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         public List<ByteBuffer> getVariablesForStatement(int statementInBatch);
     }
 
-    public static class PreparedBatchVariables implements BatchVariables
-    {
-        public static final BatchVariables EMPTY = new PreparedBatchVariables(Collections.<ByteBuffer>emptyList());
-
-        private final List<ByteBuffer> variables;
-
-        public PreparedBatchVariables(List<ByteBuffer> variables)
-        {
-            this.variables = variables;
-        }
-
-        public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
-        {
-            return variables;
-        }
-    }
-
-    public static class BatchOfPreparedVariables implements BatchVariables
-    {
-        private final List<List<ByteBuffer>> variables;
-
-        public BatchOfPreparedVariables(List<List<ByteBuffer>> variables)
-        {
-            this.variables = variables;
-        }
-
-        public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
-        {
-            return variables.get(statementInBatch);
-        }
-    }
-
     public String toString()
     {
         return String.format("BatchStatement(type=%s, statements=%s)", type, statements);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
index 4003edc..5005d2f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
@@ -71,7 +71,7 @@ public class CQL3CasConditions implements CASConditions
             throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT EXISTS conditions for the same row");
     }
 
-    public void addConditions(Composite prefix, Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+    public void addConditions(Composite prefix, Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException
     {
         RowCondition condition = conditions.get(prefix);
         if (condition == null)
@@ -83,7 +83,7 @@ public class CQL3CasConditions implements CASConditions
         {
             throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row");
         }
-        ((ColumnsConditions)condition).addConditions(conds, variables);
+        ((ColumnsConditions)condition).addConditions(conds, options);
     }
 
     public IDiskAtomFilter readFilter()
@@ -167,21 +167,21 @@ public class CQL3CasConditions implements CASConditions
 
     private static class ColumnsConditions extends RowCondition
     {
-        private final Map<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.WithVariables> conditions = new HashMap<>();
+        private final Map<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.WithOptions> conditions = new HashMap<>();
 
         private ColumnsConditions(Composite rowPrefix, long now)
         {
             super(rowPrefix, now);
         }
 
-        public void addConditions(Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+        public void addConditions(Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException
         {
             for (ColumnCondition condition : conds)
             {
                 // We will need the variables in appliesTo but with protocol batches, each condition in this object can have a
                 // different list of variables.
-                ColumnCondition.WithVariables current = condition.with(variables);
-                ColumnCondition.WithVariables previous = conditions.put(Pair.create(condition.column.name, current.getCollectionElementValue()), current);
+                ColumnCondition.WithOptions current = condition.with(options);
+                ColumnCondition.WithOptions previous = conditions.put(Pair.create(condition.column.name, current.getCollectionElementValue()), current);
                 // If 2 conditions are actually equal, let it slide
                 if (previous != null && !previous.equalsTo(current))
                     throw new InvalidRequestException("Duplicate and incompatible conditions for column " + condition.column.name);
@@ -193,7 +193,7 @@ public class CQL3CasConditions implements CASConditions
             if (current == null)
                 return conditions.isEmpty();
 
-            for (ColumnCondition.WithVariables condition : conditions.values())
+            for (ColumnCondition.WithOptions condition : conditions.values())
                 if (!condition.appliesTo(rowPrefix, current, now))
                     return false;
             return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 4741b9a..7f8b678 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -119,9 +119,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return cfm.isCounter();
     }
 
-    public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
+    public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
     {
-        return attrs.getTimestamp(now, variables);
+        return attrs.getTimestamp(now, options);
     }
 
     public boolean isTimestampSet()
@@ -129,9 +129,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return attrs.isTimestampSet();
     }
 
-    public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
+    public int getTimeToLive(QueryOptions options) throws InvalidRequestException
     {
-        return attrs.getTimeToLive(variables);
+        return attrs.getTimeToLive(options);
     }
 
     public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
@@ -284,7 +284,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         }
     }
 
-    public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
+    public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options)
     throws InvalidRequestException
     {
         CBuilder keyBuilder = cfm.getKeyValidatorAsCType().builder();
@@ -295,7 +295,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             if (r == null)
                 throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
 
-            List<ByteBuffer> values = r.values(variables);
+            List<ByteBuffer> values = r.values(options);
 
             if (keyBuilder.remainingCount() == 1)
             {
@@ -321,7 +321,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return keys;
     }
 
-    public Composite createClusteringPrefix(List<ByteBuffer> variables)
+    public Composite createClusteringPrefix(QueryOptions options)
     throws InvalidRequestException
     {
         // If the only updated/deleted columns are static, then we don't need clustering columns.
@@ -353,10 +353,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             }
         }
 
-        return createClusteringPrefixBuilderInternal(variables);
+        return createClusteringPrefixBuilderInternal(options);
     }
 
-    private Composite createClusteringPrefixBuilderInternal(List<ByteBuffer> variables)
+    private Composite createClusteringPrefixBuilderInternal(QueryOptions options)
     throws InvalidRequestException
     {
         CBuilder builder = cfm.comparator.prefixBuilder();
@@ -376,7 +376,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             }
             else
             {
-                List<ByteBuffer> values = r.values(variables);
+                List<ByteBuffer> values = r.values(options);
                 assert values.size() == 1; // We only allow IN for row keys so far
                 ByteBuffer val = values.get(0);
                 if (val == null)
@@ -488,7 +488,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         else
             cl.validateForWrite(cfm.ksName);
 
-        Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp());
+        Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState));
         if (!mutations.isEmpty())
             StorageProxy.mutateWithTriggers(mutations, cl, false);
 
@@ -498,18 +498,18 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options)
     throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> variables = options.getValues();
-        List<ByteBuffer> keys = buildPartitionKeyNames(variables);
+        List<ByteBuffer> keys = buildPartitionKeyNames(options);
         // We don't support IN for CAS operation so far
         if (keys.size() > 1)
             throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
 
         ByteBuffer key = keys.get(0);
 
-        CQL3CasConditions conditions = new CQL3CasConditions(cfm, queryState.getTimestamp());
-        Composite prefix = createClusteringPrefix(variables);
+        long now = options.getTimestamp(queryState);
+        CQL3CasConditions conditions = new CQL3CasConditions(cfm, now);
+        Composite prefix = createClusteringPrefix(options);
         ColumnFamily updates = ArrayBackedSortedColumns.factory.create(cfm);
-        addUpdatesAndConditions(key, prefix, updates, conditions, variables, getTimestamp(queryState.getTimestamp(), variables));
+        addUpdatesAndConditions(key, prefix, updates, conditions, options, getTimestamp(now, options));
 
         ColumnFamily result = StorageProxy.cas(keyspace(),
                                                columnFamily(),
@@ -521,10 +521,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return new ResultMessage.Rows(buildCasResultSet(key, result));
     }
 
-    public void addUpdatesAndConditions(ByteBuffer key, Composite clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, List<ByteBuffer> variables, long now)
+    public void addUpdatesAndConditions(ByteBuffer key, Composite clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, QueryOptions options, long now)
     throws InvalidRequestException
     {
-        UpdateParameters updParams = new UpdateParameters(cfm, variables, now, getTimeToLive(variables), null);
+        UpdateParameters updParams = new UpdateParameters(cfm, options, now, getTimeToLive(options), null);
         addUpdateForKey(updates, key, clusteringPrefix, updParams);
 
         if (ifNotExists)
@@ -541,9 +541,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         else
         {
             if (columnConditions != null)
-                conditions.addConditions(clusteringPrefix, columnConditions, variables);
+                conditions.addConditions(clusteringPrefix, columnConditions, options);
             if (staticConditions != null)
-                conditions.addConditions(cfm.comparator.staticPrefix(), staticConditions, variables);
+                conditions.addConditions(cfm.comparator.staticPrefix(), staticConditions, options);
         }
     }
 
@@ -614,7 +614,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
 
         long now = System.currentTimeMillis();
         Selection.ResultSetBuilder builder = selection.resultSetBuilder(now);
-        SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, Collections.<ByteBuffer>emptyList(), now, builder);
+        SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, QueryOptions.DEFAULT, now, builder);
 
         return builder.build();
     }
@@ -624,7 +624,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         if (hasConditions())
             throw new UnsupportedOperationException();
 
-        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(QueryOptions.DEFAULT, true, queryState.getTimestamp()))
         {
             // We don't use counters internally.
             assert mutation instanceof Mutation;
@@ -636,7 +636,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     /**
      * Convert statement into a list of mutations to apply on the server
      *
-     * @param variables value for prepared statement markers
+     * @param options value for prepared statement markers
      * @param local if true, any requests (for collections) performed by getMutation should be done locally only.
      * @param cl the consistency to use for the potential reads involved in generating the mutations (for lists set/delete operations)
      * @param now the current timestamp in microseconds to use if no timestamp is user provided.
@@ -644,13 +644,13 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+    private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now)
     throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> keys = buildPartitionKeyNames(variables);
-        Composite clusteringPrefix = createClusteringPrefix(variables);
+        List<ByteBuffer> keys = buildPartitionKeyNames(options);
+        Composite clusteringPrefix = createClusteringPrefix(options);
 
-        UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+        UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, options, local, now);
 
         Collection<IMutation> mutations = new ArrayList<IMutation>();
         for (ByteBuffer key: keys)
@@ -659,22 +659,21 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
             addUpdateForKey(cf, key, clusteringPrefix, params);
             Mutation mut = new Mutation(cfm.ksName, key, cf);
-            mutations.add(isCounter() ? new CounterMutation(mut, cl) : mut);
+            mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut);
         }
         return mutations;
     }
 
     public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
                                                  Composite prefix,
-                                                 List<ByteBuffer> variables,
+                                                 QueryOptions options,
                                                  boolean local,
-                                                 ConsistencyLevel cl,
                                                  long now)
     throws RequestExecutionException, RequestValidationException
     {
         // Some lists operation requires reading
-        Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, cl);
-        return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+        Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, options.getConsistency());
+        return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), rows);
     }
 
     public static abstract class Parsed extends CFStatement

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Restriction.java b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
index 6b7eca7..4fd02c1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
@@ -42,7 +42,7 @@ public interface Restriction
     public boolean isContains();
 
     // Not supported by Slice, but it's convenient to have here
-    public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException;
+    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException;
 
     public static class EQ implements Restriction
     {
@@ -55,9 +55,9 @@ public interface Restriction
             this.onToken = onToken;
         }
 
-        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
         {
-            return Collections.singletonList(value.bindAndGet(variables));
+            return Collections.singletonList(value.bindAndGet(options));
         }
 
         public boolean isSlice()
@@ -145,11 +145,11 @@ public interface Restriction
                 this.values = values;
             }
 
-            public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+            public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
             {
                 List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size());
                 for (Term value : values)
-                    buffers.add(value.bindAndGet(variables));
+                    buffers.add(value.bindAndGet(options));
                 return buffers;
             }
 
@@ -174,9 +174,9 @@ public interface Restriction
                 this.marker = marker;
             }
 
-            public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+            public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
             {
-                Lists.Value lval = marker.bind(variables);
+                Lists.Value lval = marker.bind(options);
                 if (lval == null)
                     throw new InvalidRequestException("Invalid null value for IN restriction");
                 return lval.elements;
@@ -234,7 +234,7 @@ public interface Restriction
             return false;
         }
 
-        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
         {
             throw new UnsupportedOperationException();
         }
@@ -249,9 +249,9 @@ public interface Restriction
             return bounds[b.idx] != null;
         }
 
-        public ByteBuffer bound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+        public ByteBuffer bound(Bound b, QueryOptions options) throws InvalidRequestException
         {
-            return bounds[b.idx].bindAndGet(variables);
+            return bounds[b.idx].bindAndGet(options);
         }
 
         public boolean isInclusive(Bound b)
@@ -379,25 +379,25 @@ public interface Restriction
             keys.add(t);
         }
 
-        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
         {
             if (values == null)
                 return Collections.emptyList();
 
             List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size());
             for (Term value : values)
-                buffers.add(value.bindAndGet(variables));
+                buffers.add(value.bindAndGet(options));
             return buffers;
         }
 
-        public List<ByteBuffer> keys(List<ByteBuffer> variables) throws InvalidRequestException
+        public List<ByteBuffer> keys(QueryOptions options) throws InvalidRequestException
         {
             if (keys == null)
                 return Collections.emptyList();
 
             List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(keys.size());
             for (Term value : keys)
-                buffers.add(value.bindAndGet(variables));
+                buffers.add(value.bindAndGet(options));
             return buffers;
         }
 


[2/5] Native protocol v3

Posted by sl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index d79bd5b..b9ccd1a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -185,23 +185,22 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
         ConsistencyLevel cl = options.getConsistency();
-        List<ByteBuffer> variables = options.getValues();
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");
 
         cl.validateForRead(keyspace());
 
-        int limit = getLimit(variables);
+        int limit = getLimit(options);
         int limitForQuery = updateLimitForQuery(limit);
         long now = System.currentTimeMillis();
         Pageable command;
         if (isKeyRange || usesSecondaryIndexing)
         {
-            command = getRangeCommand(variables, limitForQuery, now);
+            command = getRangeCommand(options, limitForQuery, now);
         }
         else
         {
-            List<ReadCommand> commands = getSliceCommands(variables, limitForQuery, now);
+            List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
             command = commands == null ? null : new Pageable.ReadCommands(commands);
         }
 
@@ -214,13 +213,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
         if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
         {
-            return execute(command, cl, variables, limit, now);
+            return execute(command, options, limit, now);
         }
         else
         {
             QueryPager pager = QueryPagers.pager(command, cl, options.getPagingState());
             if (parameters.isCount)
-                return pageCountQuery(pager, variables, pageSize, now, limit);
+                return pageCountQuery(pager, options, pageSize, now, limit);
 
             // We can't properly do post-query ordering if we page (see #6722)
             if (needsPostQueryOrdering())
@@ -228,14 +227,14 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                                                 + "ORDER BY or the IN and sort client side, or disable paging for this query");
 
             List<Row> page = pager.fetchPage(pageSize);
-            ResultMessage.Rows msg = processResults(page, variables, limit, now);
+            ResultMessage.Rows msg = processResults(page, options, limit, now);
             if (!pager.isExhausted())
                 msg.result.metadata.setHasMorePages(pager.state());
             return msg;
         }
     }
 
-    private ResultMessage.Rows execute(Pageable command, ConsistencyLevel cl, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException, RequestExecutionException
+    private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now) throws RequestValidationException, RequestExecutionException
     {
         List<Row> rows;
         if (command == null)
@@ -245,21 +244,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         else
         {
             rows = command instanceof Pageable.ReadCommands
-                 ? StorageProxy.read(((Pageable.ReadCommands)command).commands, cl)
-                 : StorageProxy.getRangeSlice((RangeSliceCommand)command, cl);
+                 ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency())
+                 : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency());
         }
 
-        return processResults(rows, variables, limit, now);
+        return processResults(rows, options, limit, now);
     }
 
-    private ResultMessage.Rows pageCountQuery(QueryPager pager, List<ByteBuffer> variables, int pageSize, long now, int limit) throws RequestValidationException, RequestExecutionException
+    private ResultMessage.Rows pageCountQuery(QueryPager pager, QueryOptions options, int pageSize, long now, int limit) throws RequestValidationException, RequestExecutionException
     {
         int count = 0;
         while (!pager.isExhausted())
         {
             int maxLimit = pager.maxRemaining();
             logger.debug("New maxLimit for paged count query is {}", maxLimit);
-            ResultSet rset = process(pager.fetchPage(pageSize), variables, maxLimit, now);
+            ResultSet rset = process(pager.fetchPage(pageSize), options, maxLimit, now);
             count += rset.rows.size();
         }
 
@@ -269,10 +268,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return new ResultMessage.Rows(result);
     }
 
-    public ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+    public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException
     {
         // Even for count, we need to process the result as it'll group some column together in sparse column families
-        ResultSet rset = process(rows, variables, limit, now);
+        ResultSet rset = process(rows, options, limit, now);
         rset = parameters.isCount ? rset.makeCountResult(parameters.countAlias) : rset;
         return new ResultMessage.Rows(rset);
     }
@@ -288,29 +287,30 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
     public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> variables = Collections.emptyList();
-        int limit = getLimit(variables);
+        QueryOptions options = QueryOptions.DEFAULT;
+        int limit = getLimit(options);
         int limitForQuery = updateLimitForQuery(limit);
         long now = System.currentTimeMillis();
         List<Row> rows;
         if (isKeyRange || usesSecondaryIndexing)
         {
-            RangeSliceCommand command = getRangeCommand(variables, limitForQuery, now);
+            RangeSliceCommand command = getRangeCommand(options, limitForQuery, now);
             rows = command == null ? Collections.<Row>emptyList() : command.executeLocally();
         }
         else
         {
-            List<ReadCommand> commands = getSliceCommands(variables, limitForQuery, now);
+            List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
             rows = commands == null ? Collections.<Row>emptyList() : readLocally(keyspace(), commands);
         }
 
-        return processResults(rows, variables, limit, now);
+        return processResults(rows, options, limit, now);
     }
 
     public ResultSet process(List<Row> rows) throws InvalidRequestException
     {
         assert !parameters.isCount; // not yet needed
-        return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()), System.currentTimeMillis());
+        QueryOptions options = QueryOptions.DEFAULT;
+        return process(rows, options, getLimit(options), System.currentTimeMillis());
     }
 
     public String keyspace()
@@ -323,15 +323,15 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return cfm.cfName;
     }
 
-    private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+    private List<ReadCommand> getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException
     {
-        Collection<ByteBuffer> keys = getKeys(variables);
+        Collection<ByteBuffer> keys = getKeys(options);
         if (keys.isEmpty()) // in case of IN () for (the last column of) the partition key.
             return null;
 
         List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
 
-        IDiskAtomFilter filter = makeFilter(variables, limit);
+        IDiskAtomFilter filter = makeFilter(options, limit);
         if (filter == null)
             return null;
 
@@ -349,29 +349,29 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return commands;
     }
 
-    private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+    private RangeSliceCommand getRangeCommand(QueryOptions options, int limit, long now) throws RequestValidationException
     {
-        IDiskAtomFilter filter = makeFilter(variables, limit);
+        IDiskAtomFilter filter = makeFilter(options, limit);
         if (filter == null)
             return null;
 
-        List<IndexExpression> expressions = getIndexExpressions(variables);
+        List<IndexExpression> expressions = getIndexExpressions(options);
         // The LIMIT provided by the user is the number of CQL row he wants returned.
         // We want to have getRangeSlice to count the number of columns, not the number of keys.
-        AbstractBounds<RowPosition> keyBounds = getKeyBounds(variables);
+        AbstractBounds<RowPosition> keyBounds = getKeyBounds(options);
         return keyBounds == null
              ? null
              : new RangeSliceCommand(keyspace(), columnFamily(), now,  filter, keyBounds, expressions, limit, !parameters.isDistinct, false);
     }
 
-    private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer> variables) throws InvalidRequestException
+    private AbstractBounds<RowPosition> getKeyBounds(QueryOptions options) throws InvalidRequestException
     {
         IPartitioner<?> p = StorageService.getPartitioner();
 
         if (onToken)
         {
-            Token startToken = getTokenBound(Bound.START, variables, p);
-            Token endToken = getTokenBound(Bound.END, variables, p);
+            Token startToken = getTokenBound(Bound.START, options, p);
+            Token endToken = getTokenBound(Bound.END, options, p);
 
             boolean includeStart = includeKeyBound(Bound.START);
             boolean includeEnd = includeKeyBound(Bound.END);
@@ -397,8 +397,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
         else
         {
-            ByteBuffer startKeyBytes = getKeyBound(Bound.START, variables);
-            ByteBuffer finishKeyBytes = getKeyBound(Bound.END, variables);
+            ByteBuffer startKeyBytes = getKeyBound(Bound.START, options);
+            ByteBuffer finishKeyBytes = getKeyBound(Bound.END, options);
 
             RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p);
             RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
@@ -421,7 +421,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
     }
 
-    private IDiskAtomFilter makeFilter(List<ByteBuffer> variables, int limit)
+    private IDiskAtomFilter makeFilter(QueryOptions options, int limit)
     throws InvalidRequestException
     {
         if (parameters.isDistinct)
@@ -431,8 +431,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         else if (isColumnRange())
         {
             int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size();
-            List<Composite> startBounds = getRequestedBound(Bound.START, variables);
-            List<Composite> endBounds = getRequestedBound(Bound.END, variables);
+            List<Composite> startBounds = getRequestedBound(Bound.START, options);
+            List<Composite> endBounds = getRequestedBound(Bound.END, options);
             assert startBounds.size() == endBounds.size();
 
             // Handles fetching static columns. Note that for 2i, the filter is just used to restrict
@@ -516,7 +516,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
         else
         {
-            SortedSet<CellName> cellNames = getRequestedColumns(variables);
+            SortedSet<CellName> cellNames = getRequestedColumns(options);
             if (cellNames == null) // in case of IN () for the last column of the key
                 return null;
             QueryProcessor.validateCellNames(cellNames, cfm.comparator);
@@ -534,12 +534,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return new SliceQueryFilter(slices, isReversed, limit, toGroup);
     }
 
-    private int getLimit(List<ByteBuffer> variables) throws InvalidRequestException
+    private int getLimit(QueryOptions options) throws InvalidRequestException
     {
         int l = Integer.MAX_VALUE;
         if (limit != null)
         {
-            ByteBuffer b = limit.bindAndGet(variables);
+            ByteBuffer b = limit.bindAndGet(options);
             if (b == null)
                 throw new InvalidRequestException("Invalid null value of limit");
 
@@ -569,7 +569,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
              : limit;
     }
 
-    private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException
+    private Collection<ByteBuffer> getKeys(final QueryOptions options) throws InvalidRequestException
     {
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
         CBuilder builder = cfm.getKeyValidatorAsCType().builder();
@@ -578,7 +578,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             Restriction r = keyRestrictions[def.position()];
             assert r != null && !r.isSlice();
 
-            List<ByteBuffer> values = r.values(variables);
+            List<ByteBuffer> values = r.values(options);
 
             if (builder.remainingCount() == 1)
             {
@@ -603,7 +603,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return keys;
     }
 
-    private ByteBuffer getKeyBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+    private ByteBuffer getKeyBound(Bound b, QueryOptions options) throws InvalidRequestException
     {
         // Deal with unrestricted partition key components (special-casing is required to deal with 2i queries on the first
         // component of a composite partition key).
@@ -612,10 +612,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
         // We deal with IN queries for keys in other places, so we know buildBound will return only one result
-        return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), variables).get(0).toByteBuffer();
+        return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), options).get(0).toByteBuffer();
     }
 
-    private Token getTokenBound(Bound b, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException
+    private Token getTokenBound(Bound b, QueryOptions options, IPartitioner<?> p) throws InvalidRequestException
     {
         assert onToken;
 
@@ -623,7 +623,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         ByteBuffer value;
         if (keyRestriction.isEQ())
         {
-            value = keyRestriction.values(variables).get(0);
+            value = keyRestriction.values(options).get(0);
         }
         else
         {
@@ -631,7 +631,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             if (!slice.hasBound(b))
                 return p.getMinimumToken();
 
-            value = slice.bound(b, variables);
+            value = slice.bound(b, options);
         }
 
         if (value == null)
@@ -669,7 +669,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return false;
     }
 
-    private SortedSet<CellName> getRequestedColumns(List<ByteBuffer> variables) throws InvalidRequestException
+    private SortedSet<CellName> getRequestedColumns(QueryOptions options) throws InvalidRequestException
     {
         // Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762
         // we always do a slice for CQL3 tables, so it's ok to ignore them here
@@ -682,7 +682,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             ColumnDefinition def = idIter.next();
             assert r != null && !r.isSlice();
 
-            List<ByteBuffer> values = r.values(variables);
+            List<ByteBuffer> values = r.values(options);
             if (values.size() == 1)
             {
                 ByteBuffer val = values.get(0);
@@ -772,7 +772,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                                               Restriction[] restrictions,
                                               boolean isReversed,
                                               CType type,
-                                              List<ByteBuffer> variables) throws InvalidRequestException
+                                              QueryOptions options) throws InvalidRequestException
     {
         CBuilder builder = type.builder();
 
@@ -801,7 +801,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
             if (r.isSlice())
             {
-                builder.add(getSliceValue(def, r, b, variables));
+                builder.add(getSliceValue(def, r, b, options));
                 Relation.Type relType = ((Restriction.Slice)r).getRelation(eocBound, b);
 
                 // We can have more non null restriction if the "scalar" notation was used for the bound (#4851).
@@ -813,13 +813,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     if (isNullRestriction(r, b))
                         break;
 
-                    builder.add(getSliceValue(def, r, b, variables));
+                    builder.add(getSliceValue(def, r, b, options));
                 }
                 return Collections.singletonList(builder.build().withEOC(eocForRelation(relType)));
             }
             else
             {
-                List<ByteBuffer> values = r.values(variables);
+                List<ByteBuffer> values = r.values(options);
                 if (values.size() != 1)
                 {
                     // IN query, we only support it on the clustering column
@@ -878,23 +878,23 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return r == null || (r.isSlice() && !((Restriction.Slice)r).hasBound(b));
     }
 
-    private static ByteBuffer getSliceValue(ColumnDefinition def, Restriction r, Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+    private static ByteBuffer getSliceValue(ColumnDefinition def, Restriction r, Bound b, QueryOptions options) throws InvalidRequestException
     {
         Restriction.Slice slice = (Restriction.Slice)r;
         assert slice.hasBound(b);
-        ByteBuffer val = slice.bound(b, variables);
+        ByteBuffer val = slice.bound(b, options);
         if (val == null)
             throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
         return val;
     }
 
-    private List<Composite> getRequestedBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+    private List<Composite> getRequestedBound(Bound b, QueryOptions options) throws InvalidRequestException
     {
         assert isColumnRange();
-        return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, variables);
+        return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, options);
     }
 
-    public List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
+    public List<IndexExpression> getIndexExpressions(QueryOptions options) throws InvalidRequestException
     {
         if (!usesSecondaryIndexing || restrictedColumns.isEmpty())
             return Collections.emptyList();
@@ -927,7 +927,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 {
                     if (slice.hasBound(b))
                     {
-                        ByteBuffer value = validateIndexedValue(def, slice.bound(b, variables));
+                        ByteBuffer value = validateIndexedValue(def, slice.bound(b, options));
                         expressions.add(new IndexExpression(def.name.bytes, slice.getIndexOperator(b), value));
                     }
                 }
@@ -935,12 +935,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             else if (restriction.isContains())
             {
                 Restriction.Contains contains = (Restriction.Contains)restriction;
-                for (ByteBuffer value : contains.values(variables))
+                for (ByteBuffer value : contains.values(options))
                 {
                     validateIndexedValue(def, value);
                     expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS, value));
                 }
-                for (ByteBuffer key : contains.keys(variables))
+                for (ByteBuffer key : contains.keys(options))
                 {
                     validateIndexedValue(def, key);
                     expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS_KEY, key));
@@ -948,7 +948,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             }
             else
             {
-                List<ByteBuffer> values = restriction.values(variables);
+                List<ByteBuffer> values = restriction.values(options);
 
                 if (values.size() != 1)
                     throw new InvalidRequestException("IN restrictions are not supported on indexed columns");
@@ -969,13 +969,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return value;
     }
 
-    private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final List<ByteBuffer> variables) throws InvalidRequestException
+    private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final QueryOptions options) throws InvalidRequestException
     {
         assert sliceRestriction != null;
 
         final CellNameType type = cfm.comparator;
-        final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, variables));
-        final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, variables));
+        final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, options));
+        final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, options));
 
         return new AbstractIterator<Cell>()
         {
@@ -998,7 +998,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         };
     }
 
-    private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws InvalidRequestException
+    private ResultSet process(List<Row> rows, QueryOptions options, int limit, long now) throws InvalidRequestException
     {
         Selection.ResultSetBuilder result = selection.resultSetBuilder(now);
         for (org.apache.cassandra.db.Row row : rows)
@@ -1007,12 +1007,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             if (row.cf == null)
                 continue;
 
-            processColumnFamily(row.key.getKey(), row.cf, variables, now, result);
+            processColumnFamily(row.key.getKey(), row.cf, options, now, result);
         }
 
         ResultSet cqlRows = result.build();
 
-        orderResults(cqlRows, variables);
+        orderResults(cqlRows);
 
         // Internal calls always return columns in the comparator order, even when reverse was set
         if (isReversed)
@@ -1024,7 +1024,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     }
 
     // Used by ModificationStatement for CAS operations
-    void processColumnFamily(ByteBuffer key, ColumnFamily cf, List<ByteBuffer> variables, long now, Selection.ResultSetBuilder result)
+    void processColumnFamily(ByteBuffer key, ColumnFamily cf, QueryOptions options, long now, Selection.ResultSetBuilder result)
     throws InvalidRequestException
     {
         CFMetaData cfm = cf.metadata();
@@ -1040,7 +1040,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
         Iterator<Cell> cells = cf.getSortedColumns().iterator();
         if (sliceRestriction != null)
-            cells = applySliceRestriction(cells, variables);
+            cells = applySliceRestriction(cells, options);
 
         CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells);
 
@@ -1059,7 +1059,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                         result.add(keyComponents[def.position()]);
                         break;
                     case STATIC:
-                        addValue(result, def, staticRow);
+                        addValue(result, def, staticRow, options);
                         break;
                     default:
                         result.add((ByteBuffer)null);
@@ -1089,17 +1089,17 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                         result.add(cql3Row.getColumn(null));
                         break;
                     case REGULAR:
-                        addValue(result, def, cql3Row);
+                        addValue(result, def, cql3Row, options);
                         break;
                     case STATIC:
-                        addValue(result, def, staticRow);
+                        addValue(result, def, staticRow, options);
                         break;
                 }
             }
         }
     }
 
-    private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row)
+    private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row, QueryOptions options)
     {
         if (row == null)
         {
@@ -1112,7 +1112,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             List<Cell> collection = row.getCollection(def.name);
             ByteBuffer value = collection == null
                              ? null
-                             : ((CollectionType)def.type).serialize(collection);
+                             : ((CollectionType)def.type).serializeForNativeProtocol(collection, options.getProtocolVersion());
             result.add(value);
             return;
         }
@@ -1137,7 +1137,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     /**
      * Orders results when multiple keys are selected (using IN)
      */
-    private void orderResults(ResultSet cqlRows, List<ByteBuffer> variables) throws InvalidRequestException
+    private void orderResults(ResultSet cqlRows) throws InvalidRequestException
     {
         if (cqlRows.size() == 0 || !needsPostQueryOrdering())
             return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 6f9a270..3902e05 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -362,7 +362,7 @@ public class DefsTables
                     dropType(type);
 
                 for (MapDifference.ValueDifference<UserType> tdiff : typesDiff.entriesDiffering().values())
-                    addType(tdiff.rightValue()); // use the most recent value
+                    updateType(tdiff.rightValue()); // use the most recent value
             }
         }
     }
@@ -412,7 +412,7 @@ public class DefsTables
         ksm.userTypes.addType(ut);
 
         if (!StorageService.instance.isClientMode())
-            MigrationManager.instance.notifyUpdateKeyspace(ksm);
+            MigrationManager.instance.notifyCreateUserType(ut);
     }
 
     private static void updateKeyspace(KSMetaData newState)
@@ -444,6 +444,19 @@ public class DefsTables
         }
     }
 
+    private static void updateType(UserType ut)
+    {
+        KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace);
+        assert ksm != null;
+
+        logger.info("Updating {}", ut);
+
+        ksm.userTypes.addType(ut);
+
+        if (!StorageService.instance.isClientMode())
+            MigrationManager.instance.notifyUpdateUserType(ut);
+    }
+
     private static void dropKeyspace(String ksName)
     {
         KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
@@ -515,7 +528,7 @@ public class DefsTables
         ksm.userTypes.removeType(ut);
 
         if (!StorageService.instance.isClientMode())
-            MigrationManager.instance.notifyUpdateKeyspace(ksm);
+            MigrationManager.instance.notifyUpdateUserType(ut);
     }
 
     private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index 5db4ba0..7f75a5f 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -57,7 +58,7 @@ public abstract class CollectionType<T> extends AbstractType<T>
 
     protected abstract void appendToStringBuilder(StringBuilder sb);
 
-    public abstract ByteBuffer serialize(List<Cell> cells);
+    public abstract List<ByteBuffer> serializedValues(List<Cell> cells);
 
     @Override
     public String toString()
@@ -110,22 +111,9 @@ public abstract class CollectionType<T> extends AbstractType<T>
         return true;
     }
 
-    // Utilitary method
-    protected static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int size)
+    protected List<Cell> enforceLimit(List<Cell> cells, int version)
     {
-        ByteBuffer result = ByteBuffer.allocate(2 + size);
-        result.putShort((short)elements);
-        for (ByteBuffer bb : buffers)
-        {
-            result.putShort((short)bb.remaining());
-            result.put(bb.duplicate());
-        }
-        return (ByteBuffer)result.flip();
-    }
-
-    protected List<Cell> enforceLimit(List<Cell> cells)
-    {
-        if (cells.size() <= MAX_ELEMENTS)
+        if (version >= 3 || cells.size() <= MAX_ELEMENTS)
             return cells;
 
         logger.error("Detected collection with {} elements, more than the {} limit. Only the first {} elements will be returned to the client. "
@@ -133,12 +121,11 @@ public abstract class CollectionType<T> extends AbstractType<T>
         return cells.subList(0, MAX_ELEMENTS);
     }
 
-    public static ByteBuffer pack(List<ByteBuffer> buffers, int elements)
+    public ByteBuffer serializeForNativeProtocol(List<Cell> cells, int version)
     {
-        int size = 0;
-        for (ByteBuffer bb : buffers)
-            size += 2 + bb.remaining();
-        return pack(buffers, elements, size);
+        cells = enforceLimit(cells, version);
+        List<ByteBuffer> values = serializedValues(cells);
+        return CollectionSerializer.pack(values, cells.size(), version);
     }
 
     public CQL3Type asCQL3Type()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 43ace65..6e6821b 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -72,7 +72,7 @@ public class ListType<T> extends CollectionType<List<T>>
         return elements;
     }
 
-    public TypeSerializer<List<T>> getSerializer()
+    public ListSerializer<T> getSerializer()
     {
         return serializer;
     }
@@ -112,17 +112,11 @@ public class ListType<T> extends CollectionType<List<T>>
         sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
     }
 
-    public ByteBuffer serialize(List<Cell> cells)
+    public List<ByteBuffer> serializedValues(List<Cell> cells)
     {
-        cells = enforceLimit(cells);
-
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
-        int size = 0;
         for (Cell c : cells)
-        {
             bbs.add(c.value());
-            size += 2 + c.value().remaining();
-        }
-        return pack(bbs, cells.size(), size);
+        return bbs;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index 213e213..71023a7 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -108,7 +108,7 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     }
 
     @Override
-    public TypeSerializer<Map<K, V>> getSerializer()
+    public MapSerializer<K, V> getSerializer()
     {
         return serializer;
     }
@@ -123,23 +123,14 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
         sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Arrays.asList(keys, values)));
     }
 
-    /**
-     * Creates the same output than serialize, but from the internal representation.
-     */
-    public ByteBuffer serialize(List<Cell> cells)
+    public List<ByteBuffer> serializedValues(List<Cell> cells)
     {
-        cells = enforceLimit(cells);
-
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * cells.size());
-        int size = 0;
+        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size() * 2);
         for (Cell c : cells)
         {
-            ByteBuffer key = c.name().collectionElement();
-            ByteBuffer value = c.value();
-            bbs.add(key);
-            bbs.add(value);
-            size += 4 + key.remaining() + value.remaining();
+            bbs.add(c.name().collectionElement());
+            bbs.add(c.value());
         }
-        return pack(bbs, cells.size(), size);
+        return bbs;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 3b686b8..d2f7f12 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -77,7 +77,7 @@ public class SetType<T> extends CollectionType<Set<T>>
         return ListType.compareListOrSet(elements, o1, o2);
     }
 
-    public TypeSerializer<Set<T>> getSerializer()
+    public SetSerializer<T> getSerializer()
     {
         return serializer;
     }
@@ -92,18 +92,11 @@ public class SetType<T> extends CollectionType<Set<T>>
         sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
     }
 
-    public ByteBuffer serialize(List<Cell> cells)
+    public List<ByteBuffer> serializedValues(List<Cell> cells)
     {
-        cells = enforceLimit(cells);
-
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
-        int size = 0;
         for (Cell c : cells)
-        {
-            ByteBuffer key = c.name().collectionElement();
-            bbs.add(key);
-            size += 2 + key.remaining();
-        }
-        return pack(bbs, cells.size(), size);
+            bbs.add(c.name().collectionElement());
+        return bbs;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/UserType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java b/src/java/org/apache/cassandra/db/marshal/UserType.java
index eb95fb9..973a5be 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -64,6 +64,11 @@ public class UserType extends CompositeType
         return new UserType(keyspace, name, columnNames, columnTypes);
     }
 
+    public String getNameAsString()
+    {
+        return UTF8Type.instance.compose(name);
+    }
+
     @Override
     public final int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index af88853..9e3abcf 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.hadoop.*;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -431,8 +432,10 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         {
             ByteBuffer buffer = objToBB(sub);
             serialized.add(buffer);
-        }      
-        return CollectionType.pack(serialized, objects.size());
+        }
+        // NOTE: using protocol v1 serialization format for collections so as to not break
+        // compatibility. Not sure if that's the right thing.
+        return CollectionSerializer.pack(serialized, objects.size(), 1);
     }
 
     private ByteBuffer objToMapBB(List<Object> objects)
@@ -447,7 +450,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                 serialized.add(buffer);
             }
         } 
-        return CollectionType.pack(serialized, objects.size());
+        // NOTE: using protocol v1 serialization format for collections so as to not break
+        // compatibility. Not sure if that's the right thing.
+        return CollectionSerializer.pack(serialized, objects.size(), 1);
     }
 
     private ByteBuffer objToCompositeBB(List<Object> objects)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 9b7a8e7..6993b19 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -195,14 +195,15 @@ public class CQLSSTableWriter
         if (values.size() != boundNames.size())
             throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size()));
 
-        List<ByteBuffer> keys = insert.buildPartitionKeyNames(values);
-        Composite clusteringPrefix = insert.createClusteringPrefix(values);
+        QueryOptions options = QueryOptions.forInternalCalls(null, values);
+        List<ByteBuffer> keys = insert.buildPartitionKeyNames(options);
+        Composite clusteringPrefix = insert.createClusteringPrefix(options);
 
         long now = System.currentTimeMillis() * 1000;
         UpdateParameters params = new UpdateParameters(insert.cfm,
-                                                       values,
-                                                       insert.getTimestamp(now, values),
-                                                       insert.getTimeToLive(values),
+                                                       options,
+                                                       insert.getTimestamp(now, options),
+                                                       insert.getTimeToLive(options),
                                                        Collections.<ByteBuffer, CQL3Row>emptyMap());
 
         for (ByteBuffer key: keys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 83a391d..0e16fda 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.serializers;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.cassandra.utils.ByteBufferUtil;
+
 public abstract class CollectionSerializer<T> implements TypeSerializer<T>
 {
     public void validate(ByteBuffer bytes) throws MarshalException
@@ -28,24 +30,104 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
         // The collection is not currently being properly validated.
     }
 
-    // Utilitary method
-    protected static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int size)
+    protected abstract List<ByteBuffer> serializeValues(T value);
+    protected abstract int getElementCount(T value);
+
+    public abstract T deserializeForNativeProtocol(ByteBuffer buffer, int version);
+
+    public ByteBuffer serialize(T value)
+    {
+        List<ByteBuffer> values = serializeValues(value);
+        // The only case we serialize/deserialize collections internally (i.e. not for the protocol sake),
+        // is when collections are in UDT values. There, we use the protocol 3 version since it's more flexible.
+        return pack(values, getElementCount(value), 3);
+    }
+
+    public T deserialize(ByteBuffer bytes)
     {
-        ByteBuffer result = ByteBuffer.allocate(2 + size);
-        result.putShort((short)elements);
+        // The only case we serialize/deserialize collections internally (i.e. not for the protocol sake),
+        // is when collections are in UDT values. There, we use the protocol 3 version since it's more flexible.
+        return deserializeForNativeProtocol(bytes, 3);
+    }
+
+    public static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int version)
+    {
+        int size = 0;
+        for (ByteBuffer bb : buffers)
+            size += sizeOfValue(bb, version);
+
+        ByteBuffer result = ByteBuffer.allocate(sizeOfCollectionSize(elements, version) + size);
+        writeCollectionSize(result, elements, version);
         for (ByteBuffer bb : buffers)
+            writeValue(result, bb, version);
+        return (ByteBuffer)result.flip();
+    }
+
+    protected static void writeCollectionSize(ByteBuffer output, int elements, int version)
+    {
+        if (version >= 3)
+            output.putInt(elements);
+        else
+            output.putShort((short)elements);
+    }
+
+    protected static int readCollectionSize(ByteBuffer input, int version)
+    {
+        return version >= 3 ? input.getInt() : ByteBufferUtil.readShortLength(input);
+    }
+
+    protected static int sizeOfCollectionSize(int elements, int version)
+    {
+        return version >= 3 ? 4 : 2;
+    }
+
+    protected static void writeValue(ByteBuffer output, ByteBuffer value, int version)
+    {
+        if (version >= 3)
         {
-            result.putShort((short)bb.remaining());
-            result.put(bb.duplicate());
+            if (value == null)
+            {
+                output.putInt(-1);
+                return;
+            }
+
+            output.putInt(value.remaining());
+            output.put(value.duplicate());
+        }
+        else
+        {
+            assert value != null;
+            output.putShort((short)value.remaining());
+            output.put(value.duplicate());
         }
-        return (ByteBuffer)result.flip();
     }
 
-    public static ByteBuffer pack(List<ByteBuffer> buffers, int elements)
+    protected static ByteBuffer readValue(ByteBuffer input, int version)
     {
-        int size = 0;
-        for (ByteBuffer bb : buffers)
-            size += 2 + bb.remaining();
-        return pack(buffers, elements, size);
+        if (version >= 3)
+        {
+            int size = input.getInt();
+            if (size < 0)
+                return null;
+
+            return ByteBufferUtil.readBytes(input, size);
+        }
+        else
+        {
+            return ByteBufferUtil.readBytesWithShortLength(input);
+        }
+    }
+
+    protected static int sizeOfValue(ByteBuffer value, int version)
+    {
+        if (version >= 3)
+        {
+            return value == null ? 4 : 4 + value.remaining();
+        }
+        else
+        {
+            assert value != null;
+            return 2 + value.remaining();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/ListSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java b/src/java/org/apache/cassandra/serializers/ListSerializer.java
index 59f25d2..e662341 100644
--- a/src/java/org/apache/cassandra/serializers/ListSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java
@@ -47,16 +47,29 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
         this.elements = elements;
     }
 
-    public List<T> deserialize(ByteBuffer bytes)
+    public List<ByteBuffer> serializeValues(List<T> values)
+    {
+        List<ByteBuffer> buffers = new ArrayList<>(values.size());
+        for (T value : values)
+            buffers.add(elements.serialize(value));
+        return buffers;
+    }
+
+    public int getElementCount(List<T> value)
+    {
+        return value.size();
+    }
+
+    public List<T> deserializeForNativeProtocol(ByteBuffer bytes, int version)
     {
         try
         {
             ByteBuffer input = bytes.duplicate();
-            int n = ByteBufferUtil.readShortLength(input);
+            int n = readCollectionSize(input, version);
             List<T> l = new ArrayList<T>(n);
             for (int i = 0; i < n; i++)
             {
-                ByteBuffer databb = ByteBufferUtil.readBytesWithShortLength(input);
+                ByteBuffer databb = readValue(input, version);
                 elements.validate(databb);
                 l.add(elements.deserialize(databb));
             }
@@ -68,26 +81,6 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
         }
     }
 
-    /**
-     * Layout is: {@code <n><s_1><b_1>...<s_n><b_n> }
-     * where:
-     *   n is the number of elements
-     *   s_i is the number of bytes composing the ith element
-     *   b_i is the s_i bytes composing the ith element
-     */
-    public ByteBuffer serialize(List<T> value)
-    {
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(value.size());
-        int size = 0;
-        for (T elt : value)
-        {
-            ByteBuffer bb = elements.serialize(elt);
-            bbs.add(bb);
-            size += 2 + bb.remaining();
-        }
-        return pack(bbs, value.size(), size);
-    }
-
     public String toString(List<T> value)
     {
         StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/MapSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java b/src/java/org/apache/cassandra/serializers/MapSerializer.java
index f79d07f..5d349dd 100644
--- a/src/java/org/apache/cassandra/serializers/MapSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java
@@ -51,19 +51,35 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
         this.values = values;
     }
 
-    public Map<K, V> deserialize(ByteBuffer bytes)
+    public List<ByteBuffer> serializeValues(Map<K, V> map)
+    {
+        List<ByteBuffer> buffers = new ArrayList<>(map.size() * 2);
+        for (Map.Entry<K, V> entry : map.entrySet())
+        {
+            buffers.add(keys.serialize(entry.getKey()));
+            buffers.add(values.serialize(entry.getValue()));
+        }
+        return buffers;
+    }
+
+    public int getElementCount(Map<K, V> value)
+    {
+        return value.size();
+    }
+
+    public Map<K, V> deserializeForNativeProtocol(ByteBuffer bytes, int version)
     {
         try
         {
             ByteBuffer input = bytes.duplicate();
-            int n = ByteBufferUtil.readShortLength(input);
+            int n = readCollectionSize(input, version);
             Map<K, V> m = new LinkedHashMap<K, V>(n);
             for (int i = 0; i < n; i++)
             {
-                ByteBuffer kbb = ByteBufferUtil.readBytesWithShortLength(input);
+                ByteBuffer kbb = readValue(input, version);
                 keys.validate(kbb);
 
-                ByteBuffer vbb = ByteBufferUtil.readBytesWithShortLength(input);
+                ByteBuffer vbb = readValue(input, version);
                 values.validate(vbb);
 
                 m.put(keys.deserialize(kbb), values.deserialize(vbb));
@@ -76,30 +92,6 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
         }
     }
 
-    /**
-     * Layout is: {@code <n><sk_1><k_1><sv_1><v_1>...<sk_n><k_n><sv_n><v_n> }
-     * where:
-     *   n is the number of elements
-     *   sk_i is the number of bytes composing the ith key k_i
-     *   k_i is the sk_i bytes composing the ith key
-     *   sv_i is the number of bytes composing the ith value v_i
-     *   v_i is the sv_i bytes composing the ith value
-     */
-    public ByteBuffer serialize(Map<K, V> value)
-    {
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * value.size());
-        int size = 0;
-        for (Map.Entry<K, V> entry : value.entrySet())
-        {
-            ByteBuffer bbk = keys.serialize(entry.getKey());
-            ByteBuffer bbv = values.serialize(entry.getValue());
-            bbs.add(bbk);
-            bbs.add(bbv);
-            size += 4 + bbk.remaining() + bbv.remaining();
-        }
-        return pack(bbs, value.size(), size);
-    }
-
     public String toString(Map<K, V> value)
     {
         StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/SetSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/SetSerializer.java b/src/java/org/apache/cassandra/serializers/SetSerializer.java
index d6d7062..812dd68 100644
--- a/src/java/org/apache/cassandra/serializers/SetSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/SetSerializer.java
@@ -47,16 +47,29 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
         this.elements = elements;
     }
 
-    public Set<T> deserialize(ByteBuffer bytes)
+    public List<ByteBuffer> serializeValues(Set<T> values)
+    {
+        List<ByteBuffer> buffers = new ArrayList<>(values.size());
+        for (T value : values)
+            buffers.add(elements.serialize(value));
+        return buffers;
+    }
+
+    public int getElementCount(Set<T> value)
+    {
+        return value.size();
+    }
+
+    public Set<T> deserializeForNativeProtocol(ByteBuffer bytes, int version)
     {
         try
         {
             ByteBuffer input = bytes.duplicate();
-            int n = ByteBufferUtil.readShortLength(input);
+            int n = readCollectionSize(input, version);
             Set<T> l = new LinkedHashSet<T>(n);
             for (int i = 0; i < n; i++)
             {
-                ByteBuffer databb = ByteBufferUtil.readBytesWithShortLength(input);
+                ByteBuffer databb = readValue(input, version);
                 elements.validate(databb);
                 l.add(elements.deserialize(databb));
             }
@@ -68,26 +81,6 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
         }
     }
 
-    /**
-     * Layout is: {@code <n><s_1><b_1>...<s_n><b_n> }
-     * where:
-     *   n is the number of elements
-     *   s_i is the number of bytes composing the ith element
-     *   b_i is the s_i bytes composing the ith element
-     */
-    public ByteBuffer serialize(Set<T> value)
-    {
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(value.size());
-        int size = 0;
-        for (T elt : value)
-        {
-            ByteBuffer bb = elements.serialize(elt);
-            bbs.add(bb);
-            size += 2 + bb.remaining();
-        }
-        return pack(bbs, value.size(), size);
-    }
-
     public String toString(Set<T> value)
     {
         StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
index e16ac62..4d142bd 100644
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ b/src/java/org/apache/cassandra/service/IMigrationListener.java
@@ -21,10 +21,13 @@ public interface IMigrationListener
 {
     public void onCreateKeyspace(String ksName);
     public void onCreateColumnFamily(String ksName, String cfName);
+    public void onCreateUserType(String ksName, String typeName);
 
     public void onUpdateKeyspace(String ksName);
     public void onUpdateColumnFamily(String ksName, String cfName);
+    public void onUpdateUserType(String ksName, String typeName);
 
     public void onDropKeyspace(String ksName);
     public void onDropColumnFamily(String ksName, String cfName);
+    public void onDropUserType(String ksName, String typeName);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 7eb7282..ec46d3f 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -167,6 +167,12 @@ public class MigrationManager
             listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
     }
 
+    public void notifyCreateUserType(UserType ut)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
+    }
+
     public void notifyUpdateKeyspace(KSMetaData ksm)
     {
         for (IMigrationListener listener : listeners)
@@ -179,6 +185,12 @@ public class MigrationManager
             listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName);
     }
 
+    public void notifyUpdateUserType(UserType ut)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
+    }
+
     public void notifyDropKeyspace(KSMetaData ksm)
     {
         for (IMigrationListener listener : listeners)
@@ -191,6 +203,12 @@ public class MigrationManager
             listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
     }
 
+    public void notifyDropUserType(UserType ut)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onDropUserType(ut.keyspace, ut.getNameAsString());
+    }
+
     public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException
     {
         announceNewKeyspace(ksm, FBUtilities.timestampMicros());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index c4abe0b..3040aaf 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1969,7 +1969,7 @@ public class CassandraServer implements Cassandra.Iface
             }
 
             ThriftClientState cState = state();
-            return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), new QueryOptions(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();
+            return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
@@ -2100,7 +2100,7 @@ public class CassandraServer implements Cassandra.Iface
 
             return cState.getCQLQueryHandler().processPrepared(statement,
                                                                cState.getQueryState(),
-                                                               new QueryOptions(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult();
+                                                               QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult();
         }
         catch (RequestExecutionException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index e5222a1..36a7e71 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -36,6 +36,7 @@ import io.netty.util.CharsetUtil;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
 /**
@@ -363,6 +364,22 @@ public abstract class CBUtil
         return size;
     }
 
+    public static Pair<List<String>, List<ByteBuffer>> readNameAndValueList(ByteBuf cb)
+    {
+        int size = cb.readUnsignedShort();
+        if (size == 0)
+            return Pair.create(Collections.<String>emptyList(), Collections.<ByteBuffer>emptyList());
+
+        List<String> s = new ArrayList<>(size);
+        List<ByteBuffer> l = new ArrayList<>(size);
+        for (int i = 0; i < size; i++)
+        {
+            s.add(readString(cb));
+            l.add(readValue(cb));
+        }
+        return Pair.create(s, l);
+    }
+
     public static InetSocketAddress readInet(ByteBuf cb)
     {
         int addrSize = cb.readByte();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 4a50bde..989b954 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -128,7 +128,7 @@ public class Client extends SimpleClient
                     return null;
                 }
             }
-            return new QueryMessage(query, new QueryOptions(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null));
+            return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null));
         }
         else if (msgType.equals("PREPARE"))
         {
@@ -156,7 +156,7 @@ public class Client extends SimpleClient
                     }
                     values.add(bb);
                 }
-                return new ExecuteMessage(MD5Digest.wrap(id), new QueryOptions(ConsistencyLevel.ONE, values));
+                return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, values));
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/DataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java
index f0b5d95..3cff973 100644
--- a/src/java/org/apache/cassandra/transport/DataType.java
+++ b/src/java/org/apache/cassandra/transport/DataType.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.transport;
 
-import java.nio.charset.StandardCharsets;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -51,7 +51,9 @@ public enum DataType implements OptionCodec.Codecable<DataType>
     INET     (16, InetAddressType.instance),
     LIST     (32, null),
     MAP      (33, null),
-    SET      (34, null);
+    SET      (34, null),
+    UDT      (48, null);
+
 
     public static final OptionCodec<DataType> codec = new OptionCodec<DataType>(DataType.class);
 
@@ -78,27 +80,39 @@ public enum DataType implements OptionCodec.Codecable<DataType>
         return id;
     }
 
-    public Object readValue(ByteBuf cb)
+    public Object readValue(ByteBuf cb, int version)
     {
         switch (this)
         {
             case CUSTOM:
                 return CBUtil.readString(cb);
             case LIST:
-                return DataType.toType(codec.decodeOne(cb));
+                return DataType.toType(codec.decodeOne(cb, version));
             case SET:
-                return DataType.toType(codec.decodeOne(cb));
+                return DataType.toType(codec.decodeOne(cb, version));
             case MAP:
                 List<AbstractType> l = new ArrayList<AbstractType>(2);
-                l.add(DataType.toType(codec.decodeOne(cb)));
-                l.add(DataType.toType(codec.decodeOne(cb)));
+                l.add(DataType.toType(codec.decodeOne(cb, version)));
+                l.add(DataType.toType(codec.decodeOne(cb, version)));
                 return l;
+            case UDT:
+                String ks = CBUtil.readString(cb);
+                ByteBuffer name = UTF8Type.instance.decompose(CBUtil.readString(cb));
+                int n = cb.readUnsignedShort();
+                List<ByteBuffer> fieldNames = new ArrayList<>(n);
+                List<AbstractType<?>> fieldTypes = new ArrayList<>(n);
+                for (int i = 0; i < n; i++)
+                {
+                    fieldNames.add(UTF8Type.instance.decompose(CBUtil.readString(cb)));
+                    fieldTypes.add(DataType.toType(codec.decodeOne(cb, version)));
+                }
+                return new UserType(ks, name, fieldNames, fieldTypes);
             default:
                 return null;
         }
     }
 
-    public void writeValue(Object value, ByteBuf cb)
+    public void writeValue(Object value, ByteBuf cb, int version)
     {
         switch (this)
         {
@@ -107,40 +121,63 @@ public enum DataType implements OptionCodec.Codecable<DataType>
                 CBUtil.writeString((String)value, cb);
                 break;
             case LIST:
-                codec.writeOne(DataType.fromType((AbstractType)value), cb);
+                codec.writeOne(DataType.fromType((AbstractType)value, version), cb, version);
                 break;
             case SET:
-                codec.writeOne(DataType.fromType((AbstractType)value), cb);
+                codec.writeOne(DataType.fromType((AbstractType)value, version), cb, version);
                 break;
             case MAP:
                 List<AbstractType> l = (List<AbstractType>)value;
-                codec.writeOne(DataType.fromType(l.get(0)), cb);
-                codec.writeOne(DataType.fromType(l.get(1)), cb);
+                codec.writeOne(DataType.fromType(l.get(0), version), cb, version);
+                codec.writeOne(DataType.fromType(l.get(1), version), cb, version);
+                break;
+            case UDT:
+                UserType udt = (UserType)value;
+                CBUtil.writeString(udt.keyspace, cb);
+                CBUtil.writeString(UTF8Type.instance.compose(udt.name), cb);
+                cb.writeShort(udt.columnNames.size());
+                for (int i = 0; i < udt.columnNames.size(); i++)
+                {
+                    CBUtil.writeString(UTF8Type.instance.compose(udt.columnNames.get(i)), cb);
+                    codec.writeOne(DataType.fromType(udt.types.get(i), version), cb, version);
+                }
                 break;
         }
     }
 
-    public int serializedValueSize(Object value)
+    public int serializedValueSize(Object value, int version)
     {
         switch (this)
         {
             case CUSTOM:
-                return 2 + ((String)value).getBytes(StandardCharsets.UTF_8).length;
+                return CBUtil.sizeOfString((String)value);
             case LIST:
             case SET:
-                return codec.oneSerializedSize(DataType.fromType((AbstractType)value));
+                return codec.oneSerializedSize(DataType.fromType((AbstractType)value, version), version);
             case MAP:
                 List<AbstractType> l = (List<AbstractType>)value;
                 int s = 0;
-                s += codec.oneSerializedSize(DataType.fromType(l.get(0)));
-                s += codec.oneSerializedSize(DataType.fromType(l.get(1)));
+                s += codec.oneSerializedSize(DataType.fromType(l.get(0), version), version);
+                s += codec.oneSerializedSize(DataType.fromType(l.get(1), version), version);
                 return s;
+            case UDT:
+                UserType udt = (UserType)value;
+                int size = 0;
+                size += CBUtil.sizeOfString(udt.keyspace);
+                size += CBUtil.sizeOfString(UTF8Type.instance.compose(udt.name));
+                size += 2;
+                for (int i = 0; i < udt.columnNames.size(); i++)
+                {
+                    size += CBUtil.sizeOfString(UTF8Type.instance.compose(udt.columnNames.get(i)));
+                    size += codec.oneSerializedSize(DataType.fromType(udt.types.get(i), version), version);
+                }
+                return size;
             default:
                 return 0;
         }
     }
 
-    public static Pair<DataType, Object> fromType(AbstractType type)
+    public static Pair<DataType, Object> fromType(AbstractType type, int version)
     {
         // For CQL3 clients, ReversedType is an implementation detail and they
         // shouldn't have to care about it.
@@ -170,6 +207,10 @@ public enum DataType implements OptionCodec.Codecable<DataType>
                     return Pair.<DataType, Object>create(SET, ((SetType)type).elements);
                 }
             }
+
+            if (type instanceof UserType && version >= 3)
+                return Pair.<DataType, Object>create(UDT, type);
+
             return Pair.<DataType, Object>create(CUSTOM, type.toString());
         }
         else
@@ -193,6 +234,8 @@ public enum DataType implements OptionCodec.Codecable<DataType>
                 case MAP:
                     List<AbstractType> l = (List<AbstractType>)entry.right;
                     return MapType.getInstance(l.get(0), l.get(1));
+                case UDT:
+                    return (AbstractType)entry.right;
                 default:
                     return entry.left.type;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 242ad64..7ec026e 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.transport;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
+import com.google.common.base.Objects;
 import io.netty.buffer.ByteBuf;
 
 public abstract class Event
@@ -33,33 +34,33 @@ public abstract class Event
         this.type = type;
     }
 
-    public static Event deserialize(ByteBuf cb)
+    public static Event deserialize(ByteBuf cb, int version)
     {
         switch (CBUtil.readEnumValue(Type.class, cb))
         {
             case TOPOLOGY_CHANGE:
-                return TopologyChange.deserializeEvent(cb);
+                return TopologyChange.deserializeEvent(cb, version);
             case STATUS_CHANGE:
-                return StatusChange.deserializeEvent(cb);
+                return StatusChange.deserializeEvent(cb, version);
             case SCHEMA_CHANGE:
-                return SchemaChange.deserializeEvent(cb);
+                return SchemaChange.deserializeEvent(cb, version);
         }
         throw new AssertionError();
     }
 
-    public void serialize(ByteBuf dest)
+    public void serialize(ByteBuf dest, int version)
     {
         CBUtil.writeEnumValue(type, dest);
-        serializeEvent(dest);
+        serializeEvent(dest, version);
     }
 
-    public int serializedSize()
+    public int serializedSize(int version)
     {
-        return CBUtil.sizeOfEnumValue(type) + eventSerializedSize();
+        return CBUtil.sizeOfEnumValue(type) + eventSerializedSize(version);
     }
 
-    protected abstract void serializeEvent(ByteBuf dest);
-    protected abstract int eventSerializedSize();
+    protected abstract void serializeEvent(ByteBuf dest, int version);
+    protected abstract int eventSerializedSize(int version);
 
     public static class TopologyChange extends Event
     {
@@ -91,20 +92,20 @@ public abstract class Event
         }
 
         // Assumes the type has already been deserialized
-        private static TopologyChange deserializeEvent(ByteBuf cb)
+        private static TopologyChange deserializeEvent(ByteBuf cb, int version)
         {
             Change change = CBUtil.readEnumValue(Change.class, cb);
             InetSocketAddress node = CBUtil.readInet(cb);
             return new TopologyChange(change, node);
         }
 
-        protected void serializeEvent(ByteBuf dest)
+        protected void serializeEvent(ByteBuf dest, int version)
         {
             CBUtil.writeEnumValue(change, dest);
             CBUtil.writeInet(node, dest);
         }
 
-        protected int eventSerializedSize()
+        protected int eventSerializedSize(int version)
         {
             return CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfInet(node);
         }
@@ -114,6 +115,23 @@ public abstract class Event
         {
             return change + " " + node;
         }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hashCode(change, node);
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof TopologyChange))
+                return false;
+
+            TopologyChange tpc = (TopologyChange)other;
+            return Objects.equal(change, tpc.change)
+                && Objects.equal(node, tpc.node);
+        }
     }
 
     public static class StatusChange extends Event
@@ -141,20 +159,20 @@ public abstract class Event
         }
 
         // Assumes the type has already been deserialized
-        private static StatusChange deserializeEvent(ByteBuf cb)
+        private static StatusChange deserializeEvent(ByteBuf cb, int version)
         {
             Status status = CBUtil.readEnumValue(Status.class, cb);
             InetSocketAddress node = CBUtil.readInet(cb);
             return new StatusChange(status, node);
         }
 
-        protected void serializeEvent(ByteBuf dest)
+        protected void serializeEvent(ByteBuf dest, int version)
         {
             CBUtil.writeEnumValue(status, dest);
             CBUtil.writeInet(node, dest);
         }
 
-        protected int eventSerializedSize()
+        protected int eventSerializedSize(int version)
         {
             return CBUtil.sizeOfEnumValue(status) + CBUtil.sizeOfInet(node);
         }
@@ -164,56 +182,130 @@ public abstract class Event
         {
             return status + " " + node;
         }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hashCode(status, node);
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof StatusChange))
+                return false;
+
+            StatusChange stc = (StatusChange)other;
+            return Objects.equal(status, stc.status)
+                && Objects.equal(node, stc.node);
+        }
     }
 
     public static class SchemaChange extends Event
     {
         public enum Change { CREATED, UPDATED, DROPPED }
+        public enum Target { KEYSPACE, TABLE, TYPE }
 
         public final Change change;
+        public final Target target;
         public final String keyspace;
-        public final String table;
+        public final String tableOrType;
 
-        public SchemaChange(Change change, String keyspace, String table)
+        public SchemaChange(Change change, Target target, String keyspace, String tableOrType)
         {
             super(Type.SCHEMA_CHANGE);
             this.change = change;
+            this.target = target;
             this.keyspace = keyspace;
-            this.table = table;
+            this.tableOrType = tableOrType;
         }
 
         public SchemaChange(Change change, String keyspace)
         {
-            this(change, keyspace, "");
+            this(change, Target.KEYSPACE, keyspace, null);
         }
 
         // Assumes the type has already been deserialized
-        private static SchemaChange deserializeEvent(ByteBuf cb)
+        private static SchemaChange deserializeEvent(ByteBuf cb, int version)
         {
             Change change = CBUtil.readEnumValue(Change.class, cb);
-            String keyspace = CBUtil.readString(cb);
-            String table = CBUtil.readString(cb);
-            return new SchemaChange(change, keyspace, table);
+            if (version >= 3)
+            {
+                Target target = CBUtil.readEnumValue(Target.class, cb);
+                String keyspace = CBUtil.readString(cb);
+                String tableOrType = target == Target.KEYSPACE ? null : CBUtil.readString(cb);
+                return new SchemaChange(change, target, keyspace, tableOrType);
+            }
+            else
+            {
+                String keyspace = CBUtil.readString(cb);
+                String table = CBUtil.readString(cb);
+                return new SchemaChange(change, table.isEmpty() ? Target.KEYSPACE : Target.TABLE, keyspace, table.isEmpty() ? null : table);
+            }
         }
 
-        protected void serializeEvent(ByteBuf dest)
+        protected void serializeEvent(ByteBuf dest, int version)
         {
-            CBUtil.writeEnumValue(change, dest);
-            CBUtil.writeString(keyspace, dest);
-            CBUtil.writeString(table, dest);
+            if (version >= 3)
+            {
+                CBUtil.writeEnumValue(change, dest);
+                CBUtil.writeEnumValue(target, dest);
+                CBUtil.writeString(keyspace, dest);
+                if (target != Target.KEYSPACE)
+                    CBUtil.writeString(tableOrType, dest);
+            }
+            else
+            {
+                CBUtil.writeEnumValue(change, dest);
+                CBUtil.writeString(keyspace, dest);
+                CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrType, dest);
+            }
         }
 
-        protected int eventSerializedSize()
+        protected int eventSerializedSize(int version)
         {
-            return CBUtil.sizeOfEnumValue(change)
-                 + CBUtil.sizeOfString(keyspace)
-                 + CBUtil.sizeOfString(table);
+            if (version >= 3)
+            {
+                int size = CBUtil.sizeOfEnumValue(change)
+                         + CBUtil.sizeOfEnumValue(target)
+                         + CBUtil.sizeOfString(keyspace);
+
+                if (target != Target.KEYSPACE)
+                    size += CBUtil.sizeOfString(tableOrType);
+
+                return size;
+            }
+            else
+            {
+                return CBUtil.sizeOfEnumValue(change)
+                     + CBUtil.sizeOfString(keyspace)
+                     + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrType);
+            }
         }
 
         @Override
         public String toString()
         {
-            return change + " " + keyspace + (table.isEmpty() ? "" : "." + table);
+            return change + " " + target + " " + keyspace + (tableOrType == null ? "" : "." + tableOrType);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hashCode(change, target, keyspace, tableOrType);
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof SchemaChange))
+                return false;
+
+            SchemaChange scc = (SchemaChange)other;
+            return Objects.equal(change, scc.change)
+                && Objects.equal(target, scc.target)
+                && Objects.equal(keyspace, scc.keyspace)
+                && Objects.equal(tableOrType, scc.tableOrType);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/OptionCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java
index 9b82bda..ec2a1fa 100644
--- a/src/java/org/apache/cassandra/transport/OptionCodec.java
+++ b/src/java/org/apache/cassandra/transport/OptionCodec.java
@@ -32,9 +32,9 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
     {
         public int getId();
 
-        public Object readValue(ByteBuf cb);
-        public void writeValue(Object value, ByteBuf cb);
-        public int serializedValueSize(Object obj);
+        public Object readValue(ByteBuf cb, int version);
+        public void writeValue(Object value, ByteBuf cb, int version);
+        public int serializedValueSize(Object obj, int version);
     }
 
     private final Class<T> klass;
@@ -66,14 +66,14 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         return opt;
     }
 
-    public Map<T, Object> decode(ByteBuf body)
+    public Map<T, Object> decode(ByteBuf body, int version)
     {
         EnumMap<T, Object> options = new EnumMap<T, Object>(klass);
         int n = body.readUnsignedShort();
         for (int i = 0; i < n; i++)
         {
             T opt = fromId(body.readUnsignedShort());
-            Object value = opt.readValue(body);
+            Object value = opt.readValue(body, version);
             if (options.containsKey(opt))
                 throw new ProtocolException(String.format("Duplicate option %s in message", opt.name()));
             options.put(opt, value);
@@ -81,41 +81,41 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         return options;
     }
 
-    public ByteBuf encode(Map<T, Object> options)
+    public ByteBuf encode(Map<T, Object> options, int version)
     {
         int optLength = 2;
         for (Map.Entry<T, Object> entry : options.entrySet())
-            optLength += 2 + entry.getKey().serializedValueSize(entry.getValue());
+            optLength += 2 + entry.getKey().serializedValueSize(entry.getValue(), version);
         ByteBuf cb = Unpooled.buffer(optLength);
         cb.writeShort(options.size());
         for (Map.Entry<T, Object> entry : options.entrySet())
         {
             T opt = entry.getKey();
             cb.writeShort(opt.getId());
-            opt.writeValue(entry.getValue(), cb);
+            opt.writeValue(entry.getValue(), cb, version);
         }
         return cb;
     }
 
-    public Pair<T, Object> decodeOne(ByteBuf body)
+    public Pair<T, Object> decodeOne(ByteBuf body, int version)
     {
         T opt = fromId(body.readUnsignedShort());
-        Object value = opt.readValue(body);
+        Object value = opt.readValue(body, version);
         return Pair.create(opt, value);
     }
 
-    public void writeOne(Pair<T, Object> option, ByteBuf dest)
+    public void writeOne(Pair<T, Object> option, ByteBuf dest, int version)
     {
         T opt = option.left;
         Object obj = option.right;
         dest.writeShort(opt.getId());
-        opt.writeValue(obj, dest);
+        opt.writeValue(obj, dest, version);
     }
 
-    public int oneSerializedSize(Pair<T, Object> option)
+    public int oneSerializedSize(Pair<T, Object> option, int version)
     {
         T opt = option.left;
         Object obj = option.right;
-        return 2 + opt.serializedValueSize(obj);
+        return 2 + opt.serializedValueSize(obj, version);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 8d08ffd..eb2b043 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -378,7 +378,12 @@ public class Server implements CassandraDaemon.Server
 
         public void onCreateColumnFamily(String ksName, String cfName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName, cfName));
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+        }
+
+        public void onCreateUserType(String ksName, String typeName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }
 
         public void onUpdateKeyspace(String ksName)
@@ -388,7 +393,12 @@ public class Server implements CassandraDaemon.Server
 
         public void onUpdateColumnFamily(String ksName, String cfName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName, cfName));
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+        }
+
+        public void onUpdateUserType(String ksName, String typeName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }
 
         public void onDropKeyspace(String ksName)
@@ -398,7 +408,12 @@ public class Server implements CassandraDaemon.Server
 
         public void onDropColumnFamily(String ksName, String cfName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName, cfName));
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+        }
+
+        public void onDropUserType(String ksName, String typeName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index ef56882..3cf9b7b 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -157,7 +157,7 @@ public class SimpleClient
 
     public ResultMessage execute(String query, List<ByteBuffer> values, ConsistencyLevel consistencyLevel)
     {
-        Message.Response msg = execute(new QueryMessage(query, new QueryOptions(consistencyLevel, values)));
+        Message.Response msg = execute(new QueryMessage(query, QueryOptions.forInternalCalls(consistencyLevel, values)));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }
@@ -171,7 +171,7 @@ public class SimpleClient
 
     public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
     {
-        Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), new QueryOptions(consistency, values)));
+        Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), QueryOptions.forInternalCalls(consistency, values)));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }