You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/30 20:22:41 UTC

[2/4] Native protocol v3

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index d79bd5b..b9ccd1a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -185,23 +185,22 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
         ConsistencyLevel cl = options.getConsistency();
-        List<ByteBuffer> variables = options.getValues();
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency level");
 
         cl.validateForRead(keyspace());
 
-        int limit = getLimit(variables);
+        int limit = getLimit(options);
         int limitForQuery = updateLimitForQuery(limit);
         long now = System.currentTimeMillis();
         Pageable command;
         if (isKeyRange || usesSecondaryIndexing)
         {
-            command = getRangeCommand(variables, limitForQuery, now);
+            command = getRangeCommand(options, limitForQuery, now);
         }
         else
         {
-            List<ReadCommand> commands = getSliceCommands(variables, limitForQuery, now);
+            List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
             command = commands == null ? null : new Pageable.ReadCommands(commands);
         }
 
@@ -214,13 +213,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
         if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
         {
-            return execute(command, cl, variables, limit, now);
+            return execute(command, options, limit, now);
         }
         else
         {
             QueryPager pager = QueryPagers.pager(command, cl, options.getPagingState());
             if (parameters.isCount)
-                return pageCountQuery(pager, variables, pageSize, now, limit);
+                return pageCountQuery(pager, options, pageSize, now, limit);
 
             // We can't properly do post-query ordering if we page (see #6722)
             if (needsPostQueryOrdering())
@@ -228,14 +227,14 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                                                 + "ORDER BY or the IN and sort client side, or disable paging for this query");
 
             List<Row> page = pager.fetchPage(pageSize);
-            ResultMessage.Rows msg = processResults(page, variables, limit, now);
+            ResultMessage.Rows msg = processResults(page, options, limit, now);
             if (!pager.isExhausted())
                 msg.result.metadata.setHasMorePages(pager.state());
             return msg;
         }
     }
 
-    private ResultMessage.Rows execute(Pageable command, ConsistencyLevel cl, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException, RequestExecutionException
+    private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now) throws RequestValidationException, RequestExecutionException
     {
         List<Row> rows;
         if (command == null)
@@ -245,21 +244,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         else
         {
             rows = command instanceof Pageable.ReadCommands
-                 ? StorageProxy.read(((Pageable.ReadCommands)command).commands, cl)
-                 : StorageProxy.getRangeSlice((RangeSliceCommand)command, cl);
+                 ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency())
+                 : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency());
         }
 
-        return processResults(rows, variables, limit, now);
+        return processResults(rows, options, limit, now);
     }
 
-    private ResultMessage.Rows pageCountQuery(QueryPager pager, List<ByteBuffer> variables, int pageSize, long now, int limit) throws RequestValidationException, RequestExecutionException
+    private ResultMessage.Rows pageCountQuery(QueryPager pager, QueryOptions options, int pageSize, long now, int limit) throws RequestValidationException, RequestExecutionException
     {
         int count = 0;
         while (!pager.isExhausted())
         {
             int maxLimit = pager.maxRemaining();
             logger.debug("New maxLimit for paged count query is {}", maxLimit);
-            ResultSet rset = process(pager.fetchPage(pageSize), variables, maxLimit, now);
+            ResultSet rset = process(pager.fetchPage(pageSize), options, maxLimit, now);
             count += rset.rows.size();
         }
 
@@ -269,10 +268,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return new ResultMessage.Rows(result);
     }
 
-    public ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+    public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException
     {
         // Even for count, we need to process the result as it'll group some column together in sparse column families
-        ResultSet rset = process(rows, variables, limit, now);
+        ResultSet rset = process(rows, options, limit, now);
         rset = parameters.isCount ? rset.makeCountResult(parameters.countAlias) : rset;
         return new ResultMessage.Rows(rset);
     }
@@ -288,29 +287,30 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
     public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> variables = Collections.emptyList();
-        int limit = getLimit(variables);
+        QueryOptions options = QueryOptions.DEFAULT;
+        int limit = getLimit(options);
         int limitForQuery = updateLimitForQuery(limit);
         long now = System.currentTimeMillis();
         List<Row> rows;
         if (isKeyRange || usesSecondaryIndexing)
         {
-            RangeSliceCommand command = getRangeCommand(variables, limitForQuery, now);
+            RangeSliceCommand command = getRangeCommand(options, limitForQuery, now);
             rows = command == null ? Collections.<Row>emptyList() : command.executeLocally();
         }
         else
         {
-            List<ReadCommand> commands = getSliceCommands(variables, limitForQuery, now);
+            List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
             rows = commands == null ? Collections.<Row>emptyList() : readLocally(keyspace(), commands);
         }
 
-        return processResults(rows, variables, limit, now);
+        return processResults(rows, options, limit, now);
     }
 
     public ResultSet process(List<Row> rows) throws InvalidRequestException
     {
         assert !parameters.isCount; // not yet needed
-        return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList()), System.currentTimeMillis());
+        QueryOptions options = QueryOptions.DEFAULT;
+        return process(rows, options, getLimit(options), System.currentTimeMillis());
     }
 
     public String keyspace()
@@ -323,15 +323,15 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return cfm.cfName;
     }
 
-    private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+    private List<ReadCommand> getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException
     {
-        Collection<ByteBuffer> keys = getKeys(variables);
+        Collection<ByteBuffer> keys = getKeys(options);
         if (keys.isEmpty()) // in case of IN () for (the last column of) the partition key.
             return null;
 
         List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
 
-        IDiskAtomFilter filter = makeFilter(variables, limit);
+        IDiskAtomFilter filter = makeFilter(options, limit);
         if (filter == null)
             return null;
 
@@ -349,29 +349,29 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return commands;
     }
 
-    private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+    private RangeSliceCommand getRangeCommand(QueryOptions options, int limit, long now) throws RequestValidationException
     {
-        IDiskAtomFilter filter = makeFilter(variables, limit);
+        IDiskAtomFilter filter = makeFilter(options, limit);
         if (filter == null)
             return null;
 
-        List<IndexExpression> expressions = getIndexExpressions(variables);
+        List<IndexExpression> expressions = getIndexExpressions(options);
         // The LIMIT provided by the user is the number of CQL row he wants returned.
         // We want to have getRangeSlice to count the number of columns, not the number of keys.
-        AbstractBounds<RowPosition> keyBounds = getKeyBounds(variables);
+        AbstractBounds<RowPosition> keyBounds = getKeyBounds(options);
         return keyBounds == null
              ? null
              : new RangeSliceCommand(keyspace(), columnFamily(), now,  filter, keyBounds, expressions, limit, !parameters.isDistinct, false);
     }
 
-    private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer> variables) throws InvalidRequestException
+    private AbstractBounds<RowPosition> getKeyBounds(QueryOptions options) throws InvalidRequestException
     {
         IPartitioner<?> p = StorageService.getPartitioner();
 
         if (onToken)
         {
-            Token startToken = getTokenBound(Bound.START, variables, p);
-            Token endToken = getTokenBound(Bound.END, variables, p);
+            Token startToken = getTokenBound(Bound.START, options, p);
+            Token endToken = getTokenBound(Bound.END, options, p);
 
             boolean includeStart = includeKeyBound(Bound.START);
             boolean includeEnd = includeKeyBound(Bound.END);
@@ -397,8 +397,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
         else
         {
-            ByteBuffer startKeyBytes = getKeyBound(Bound.START, variables);
-            ByteBuffer finishKeyBytes = getKeyBound(Bound.END, variables);
+            ByteBuffer startKeyBytes = getKeyBound(Bound.START, options);
+            ByteBuffer finishKeyBytes = getKeyBound(Bound.END, options);
 
             RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p);
             RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
@@ -421,7 +421,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
     }
 
-    private IDiskAtomFilter makeFilter(List<ByteBuffer> variables, int limit)
+    private IDiskAtomFilter makeFilter(QueryOptions options, int limit)
     throws InvalidRequestException
     {
         if (parameters.isDistinct)
@@ -431,8 +431,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         else if (isColumnRange())
         {
             int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size();
-            List<Composite> startBounds = getRequestedBound(Bound.START, variables);
-            List<Composite> endBounds = getRequestedBound(Bound.END, variables);
+            List<Composite> startBounds = getRequestedBound(Bound.START, options);
+            List<Composite> endBounds = getRequestedBound(Bound.END, options);
             assert startBounds.size() == endBounds.size();
 
             // Handles fetching static columns. Note that for 2i, the filter is just used to restrict
@@ -516,7 +516,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         }
         else
         {
-            SortedSet<CellName> cellNames = getRequestedColumns(variables);
+            SortedSet<CellName> cellNames = getRequestedColumns(options);
             if (cellNames == null) // in case of IN () for the last column of the key
                 return null;
             QueryProcessor.validateCellNames(cellNames, cfm.comparator);
@@ -534,12 +534,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return new SliceQueryFilter(slices, isReversed, limit, toGroup);
     }
 
-    private int getLimit(List<ByteBuffer> variables) throws InvalidRequestException
+    private int getLimit(QueryOptions options) throws InvalidRequestException
     {
         int l = Integer.MAX_VALUE;
         if (limit != null)
         {
-            ByteBuffer b = limit.bindAndGet(variables);
+            ByteBuffer b = limit.bindAndGet(options);
             if (b == null)
                 throw new InvalidRequestException("Invalid null value of limit");
 
@@ -569,7 +569,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
              : limit;
     }
 
-    private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException
+    private Collection<ByteBuffer> getKeys(final QueryOptions options) throws InvalidRequestException
     {
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
         CBuilder builder = cfm.getKeyValidatorAsCType().builder();
@@ -578,7 +578,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             Restriction r = keyRestrictions[def.position()];
             assert r != null && !r.isSlice();
 
-            List<ByteBuffer> values = r.values(variables);
+            List<ByteBuffer> values = r.values(options);
 
             if (builder.remainingCount() == 1)
             {
@@ -603,7 +603,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return keys;
     }
 
-    private ByteBuffer getKeyBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+    private ByteBuffer getKeyBound(Bound b, QueryOptions options) throws InvalidRequestException
     {
         // Deal with unrestricted partition key components (special-casing is required to deal with 2i queries on the first
         // component of a composite partition key).
@@ -612,10 +612,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
         // We deal with IN queries for keys in other places, so we know buildBound will return only one result
-        return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), variables).get(0).toByteBuffer();
+        return buildBound(b, cfm.partitionKeyColumns(), keyRestrictions, false, cfm.getKeyValidatorAsCType(), options).get(0).toByteBuffer();
     }
 
-    private Token getTokenBound(Bound b, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException
+    private Token getTokenBound(Bound b, QueryOptions options, IPartitioner<?> p) throws InvalidRequestException
     {
         assert onToken;
 
@@ -623,7 +623,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         ByteBuffer value;
         if (keyRestriction.isEQ())
         {
-            value = keyRestriction.values(variables).get(0);
+            value = keyRestriction.values(options).get(0);
         }
         else
         {
@@ -631,7 +631,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             if (!slice.hasBound(b))
                 return p.getMinimumToken();
 
-            value = slice.bound(b, variables);
+            value = slice.bound(b, options);
         }
 
         if (value == null)
@@ -669,7 +669,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return false;
     }
 
-    private SortedSet<CellName> getRequestedColumns(List<ByteBuffer> variables) throws InvalidRequestException
+    private SortedSet<CellName> getRequestedColumns(QueryOptions options) throws InvalidRequestException
     {
         // Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762
         // we always do a slice for CQL3 tables, so it's ok to ignore them here
@@ -682,7 +682,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             ColumnDefinition def = idIter.next();
             assert r != null && !r.isSlice();
 
-            List<ByteBuffer> values = r.values(variables);
+            List<ByteBuffer> values = r.values(options);
             if (values.size() == 1)
             {
                 ByteBuffer val = values.get(0);
@@ -772,7 +772,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                                               Restriction[] restrictions,
                                               boolean isReversed,
                                               CType type,
-                                              List<ByteBuffer> variables) throws InvalidRequestException
+                                              QueryOptions options) throws InvalidRequestException
     {
         CBuilder builder = type.builder();
 
@@ -801,7 +801,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
             if (r.isSlice())
             {
-                builder.add(getSliceValue(def, r, b, variables));
+                builder.add(getSliceValue(def, r, b, options));
                 Relation.Type relType = ((Restriction.Slice)r).getRelation(eocBound, b);
 
                 // We can have more non null restriction if the "scalar" notation was used for the bound (#4851).
@@ -813,13 +813,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                     if (isNullRestriction(r, b))
                         break;
 
-                    builder.add(getSliceValue(def, r, b, variables));
+                    builder.add(getSliceValue(def, r, b, options));
                 }
                 return Collections.singletonList(builder.build().withEOC(eocForRelation(relType)));
             }
             else
             {
-                List<ByteBuffer> values = r.values(variables);
+                List<ByteBuffer> values = r.values(options);
                 if (values.size() != 1)
                 {
                     // IN query, we only support it on the clustering column
@@ -878,23 +878,23 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return r == null || (r.isSlice() && !((Restriction.Slice)r).hasBound(b));
     }
 
-    private static ByteBuffer getSliceValue(ColumnDefinition def, Restriction r, Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+    private static ByteBuffer getSliceValue(ColumnDefinition def, Restriction r, Bound b, QueryOptions options) throws InvalidRequestException
     {
         Restriction.Slice slice = (Restriction.Slice)r;
         assert slice.hasBound(b);
-        ByteBuffer val = slice.bound(b, variables);
+        ByteBuffer val = slice.bound(b, options);
         if (val == null)
             throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
         return val;
     }
 
-    private List<Composite> getRequestedBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+    private List<Composite> getRequestedBound(Bound b, QueryOptions options) throws InvalidRequestException
     {
         assert isColumnRange();
-        return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, variables);
+        return buildBound(b, cfm.clusteringColumns(), columnRestrictions, isReversed, cfm.comparator, options);
     }
 
-    public List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
+    public List<IndexExpression> getIndexExpressions(QueryOptions options) throws InvalidRequestException
     {
         if (!usesSecondaryIndexing || restrictedColumns.isEmpty())
             return Collections.emptyList();
@@ -927,7 +927,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 {
                     if (slice.hasBound(b))
                     {
-                        ByteBuffer value = validateIndexedValue(def, slice.bound(b, variables));
+                        ByteBuffer value = validateIndexedValue(def, slice.bound(b, options));
                         expressions.add(new IndexExpression(def.name.bytes, slice.getIndexOperator(b), value));
                     }
                 }
@@ -935,12 +935,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             else if (restriction.isContains())
             {
                 Restriction.Contains contains = (Restriction.Contains)restriction;
-                for (ByteBuffer value : contains.values(variables))
+                for (ByteBuffer value : contains.values(options))
                 {
                     validateIndexedValue(def, value);
                     expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS, value));
                 }
-                for (ByteBuffer key : contains.keys(variables))
+                for (ByteBuffer key : contains.keys(options))
                 {
                     validateIndexedValue(def, key);
                     expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS_KEY, key));
@@ -948,7 +948,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             }
             else
             {
-                List<ByteBuffer> values = restriction.values(variables);
+                List<ByteBuffer> values = restriction.values(options);
 
                 if (values.size() != 1)
                     throw new InvalidRequestException("IN restrictions are not supported on indexed columns");
@@ -969,13 +969,13 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         return value;
     }
 
-    private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final List<ByteBuffer> variables) throws InvalidRequestException
+    private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final QueryOptions options) throws InvalidRequestException
     {
         assert sliceRestriction != null;
 
         final CellNameType type = cfm.comparator;
-        final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, variables));
-        final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, variables));
+        final CellName excludedStart = sliceRestriction.isInclusive(Bound.START) ? null : type.makeCellName(sliceRestriction.bound(Bound.START, options));
+        final CellName excludedEnd = sliceRestriction.isInclusive(Bound.END) ? null : type.makeCellName(sliceRestriction.bound(Bound.END, options));
 
         return new AbstractIterator<Cell>()
         {
@@ -998,7 +998,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         };
     }
 
-    private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws InvalidRequestException
+    private ResultSet process(List<Row> rows, QueryOptions options, int limit, long now) throws InvalidRequestException
     {
         Selection.ResultSetBuilder result = selection.resultSetBuilder(now);
         for (org.apache.cassandra.db.Row row : rows)
@@ -1007,12 +1007,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             if (row.cf == null)
                 continue;
 
-            processColumnFamily(row.key.getKey(), row.cf, variables, now, result);
+            processColumnFamily(row.key.getKey(), row.cf, options, now, result);
         }
 
         ResultSet cqlRows = result.build();
 
-        orderResults(cqlRows, variables);
+        orderResults(cqlRows);
 
         // Internal calls always return columns in the comparator order, even when reverse was set
         if (isReversed)
@@ -1024,7 +1024,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     }
 
     // Used by ModificationStatement for CAS operations
-    void processColumnFamily(ByteBuffer key, ColumnFamily cf, List<ByteBuffer> variables, long now, Selection.ResultSetBuilder result)
+    void processColumnFamily(ByteBuffer key, ColumnFamily cf, QueryOptions options, long now, Selection.ResultSetBuilder result)
     throws InvalidRequestException
     {
         CFMetaData cfm = cf.metadata();
@@ -1040,7 +1040,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
 
         Iterator<Cell> cells = cf.getSortedColumns().iterator();
         if (sliceRestriction != null)
-            cells = applySliceRestriction(cells, variables);
+            cells = applySliceRestriction(cells, options);
 
         CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells);
 
@@ -1059,7 +1059,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                         result.add(keyComponents[def.position()]);
                         break;
                     case STATIC:
-                        addValue(result, def, staticRow);
+                        addValue(result, def, staticRow, options);
                         break;
                     default:
                         result.add((ByteBuffer)null);
@@ -1089,17 +1089,17 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                         result.add(cql3Row.getColumn(null));
                         break;
                     case REGULAR:
-                        addValue(result, def, cql3Row);
+                        addValue(result, def, cql3Row, options);
                         break;
                     case STATIC:
-                        addValue(result, def, staticRow);
+                        addValue(result, def, staticRow, options);
                         break;
                 }
             }
         }
     }
 
-    private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row)
+    private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row, QueryOptions options)
     {
         if (row == null)
         {
@@ -1112,7 +1112,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             List<Cell> collection = row.getCollection(def.name);
             ByteBuffer value = collection == null
                              ? null
-                             : ((CollectionType)def.type).serialize(collection);
+                             : ((CollectionType)def.type).serializeForNativeProtocol(collection, options.getProtocolVersion());
             result.add(value);
             return;
         }
@@ -1137,7 +1137,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     /**
      * Orders results when multiple keys are selected (using IN)
      */
-    private void orderResults(ResultSet cqlRows, List<ByteBuffer> variables) throws InvalidRequestException
+    private void orderResults(ResultSet cqlRows) throws InvalidRequestException
     {
         if (cqlRows.size() == 0 || !needsPostQueryOrdering())
             return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 6f9a270..3902e05 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -362,7 +362,7 @@ public class DefsTables
                     dropType(type);
 
                 for (MapDifference.ValueDifference<UserType> tdiff : typesDiff.entriesDiffering().values())
-                    addType(tdiff.rightValue()); // use the most recent value
+                    updateType(tdiff.rightValue()); // use the most recent value
             }
         }
     }
@@ -412,7 +412,7 @@ public class DefsTables
         ksm.userTypes.addType(ut);
 
         if (!StorageService.instance.isClientMode())
-            MigrationManager.instance.notifyUpdateKeyspace(ksm);
+            MigrationManager.instance.notifyCreateUserType(ut);
     }
 
     private static void updateKeyspace(KSMetaData newState)
@@ -444,6 +444,19 @@ public class DefsTables
         }
     }
 
+    private static void updateType(UserType ut)
+    {
+        KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace);
+        assert ksm != null;
+
+        logger.info("Updating {}", ut);
+
+        ksm.userTypes.addType(ut);
+
+        if (!StorageService.instance.isClientMode())
+            MigrationManager.instance.notifyUpdateUserType(ut);
+    }
+
     private static void dropKeyspace(String ksName)
     {
         KSMetaData ksm = Schema.instance.getKSMetaData(ksName);
@@ -515,7 +528,7 @@ public class DefsTables
         ksm.userTypes.removeType(ut);
 
         if (!StorageService.instance.isClientMode())
-            MigrationManager.instance.notifyUpdateKeyspace(ksm);
+            MigrationManager.instance.notifyUpdateUserType(ut);
     }
 
     private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index 5db4ba0..7f75a5f 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -57,7 +58,7 @@ public abstract class CollectionType<T> extends AbstractType<T>
 
     protected abstract void appendToStringBuilder(StringBuilder sb);
 
-    public abstract ByteBuffer serialize(List<Cell> cells);
+    public abstract List<ByteBuffer> serializedValues(List<Cell> cells);
 
     @Override
     public String toString()
@@ -110,22 +111,9 @@ public abstract class CollectionType<T> extends AbstractType<T>
         return true;
     }
 
-    // Utilitary method
-    protected static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int size)
+    protected List<Cell> enforceLimit(List<Cell> cells, int version)
     {
-        ByteBuffer result = ByteBuffer.allocate(2 + size);
-        result.putShort((short)elements);
-        for (ByteBuffer bb : buffers)
-        {
-            result.putShort((short)bb.remaining());
-            result.put(bb.duplicate());
-        }
-        return (ByteBuffer)result.flip();
-    }
-
-    protected List<Cell> enforceLimit(List<Cell> cells)
-    {
-        if (cells.size() <= MAX_ELEMENTS)
+        if (version >= 3 || cells.size() <= MAX_ELEMENTS)
             return cells;
 
         logger.error("Detected collection with {} elements, more than the {} limit. Only the first {} elements will be returned to the client. "
@@ -133,12 +121,11 @@ public abstract class CollectionType<T> extends AbstractType<T>
         return cells.subList(0, MAX_ELEMENTS);
     }
 
-    public static ByteBuffer pack(List<ByteBuffer> buffers, int elements)
+    public ByteBuffer serializeForNativeProtocol(List<Cell> cells, int version)
     {
-        int size = 0;
-        for (ByteBuffer bb : buffers)
-            size += 2 + bb.remaining();
-        return pack(buffers, elements, size);
+        cells = enforceLimit(cells, version);
+        List<ByteBuffer> values = serializedValues(cells);
+        return CollectionSerializer.pack(values, cells.size(), version);
     }
 
     public CQL3Type asCQL3Type()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 43ace65..6e6821b 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -72,7 +72,7 @@ public class ListType<T> extends CollectionType<List<T>>
         return elements;
     }
 
-    public TypeSerializer<List<T>> getSerializer()
+    public ListSerializer<T> getSerializer()
     {
         return serializer;
     }
@@ -112,17 +112,11 @@ public class ListType<T> extends CollectionType<List<T>>
         sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
     }
 
-    public ByteBuffer serialize(List<Cell> cells)
+    public List<ByteBuffer> serializedValues(List<Cell> cells)
     {
-        cells = enforceLimit(cells);
-
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
-        int size = 0;
         for (Cell c : cells)
-        {
             bbs.add(c.value());
-            size += 2 + c.value().remaining();
-        }
-        return pack(bbs, cells.size(), size);
+        return bbs;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index 213e213..71023a7 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -108,7 +108,7 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     }
 
     @Override
-    public TypeSerializer<Map<K, V>> getSerializer()
+    public MapSerializer<K, V> getSerializer()
     {
         return serializer;
     }
@@ -123,23 +123,14 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
         sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Arrays.asList(keys, values)));
     }
 
-    /**
-     * Creates the same output than serialize, but from the internal representation.
-     */
-    public ByteBuffer serialize(List<Cell> cells)
+    public List<ByteBuffer> serializedValues(List<Cell> cells)
     {
-        cells = enforceLimit(cells);
-
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * cells.size());
-        int size = 0;
+        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size() * 2);
         for (Cell c : cells)
         {
-            ByteBuffer key = c.name().collectionElement();
-            ByteBuffer value = c.value();
-            bbs.add(key);
-            bbs.add(value);
-            size += 4 + key.remaining() + value.remaining();
+            bbs.add(c.name().collectionElement());
+            bbs.add(c.value());
         }
-        return pack(bbs, cells.size(), size);
+        return bbs;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 3b686b8..d2f7f12 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -77,7 +77,7 @@ public class SetType<T> extends CollectionType<Set<T>>
         return ListType.compareListOrSet(elements, o1, o2);
     }
 
-    public TypeSerializer<Set<T>> getSerializer()
+    public SetSerializer<T> getSerializer()
     {
         return serializer;
     }
@@ -92,18 +92,11 @@ public class SetType<T> extends CollectionType<Set<T>>
         sb.append(getClass().getName()).append(TypeParser.stringifyTypeParameters(Collections.<AbstractType<?>>singletonList(elements)));
     }
 
-    public ByteBuffer serialize(List<Cell> cells)
+    public List<ByteBuffer> serializedValues(List<Cell> cells)
     {
-        cells = enforceLimit(cells);
-
         List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
-        int size = 0;
         for (Cell c : cells)
-        {
-            ByteBuffer key = c.name().collectionElement();
-            bbs.add(key);
-            size += 2 + key.remaining();
-        }
-        return pack(bbs, cells.size(), size);
+            bbs.add(c.name().collectionElement());
+        return bbs;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/db/marshal/UserType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UserType.java b/src/java/org/apache/cassandra/db/marshal/UserType.java
index eb95fb9..973a5be 100644
--- a/src/java/org/apache/cassandra/db/marshal/UserType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UserType.java
@@ -64,6 +64,11 @@ public class UserType extends CompositeType
         return new UserType(keyspace, name, columnNames, columnTypes);
     }
 
+    public String getNameAsString()
+    {
+        return UTF8Type.instance.compose(name);
+    }
+
     @Override
     public final int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index af88853..9e3abcf 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.hadoop.*;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -431,8 +432,10 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         {
             ByteBuffer buffer = objToBB(sub);
             serialized.add(buffer);
-        }      
-        return CollectionType.pack(serialized, objects.size());
+        }
+        // NOTE: using protocol v1 serialization format for collections so as to not break
+        // compatibility. Not sure if that's the right thing.
+        return CollectionSerializer.pack(serialized, objects.size(), 1);
     }
 
     private ByteBuffer objToMapBB(List<Object> objects)
@@ -447,7 +450,9 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
                 serialized.add(buffer);
             }
         } 
-        return CollectionType.pack(serialized, objects.size());
+        // NOTE: using protocol v1 serialization format for collections so as to not break
+        // compatibility. Not sure if that's the right thing.
+        return CollectionSerializer.pack(serialized, objects.size(), 1);
     }
 
     private ByteBuffer objToCompositeBB(List<Object> objects)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 9b7a8e7..6993b19 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -195,14 +195,15 @@ public class CQLSSTableWriter
         if (values.size() != boundNames.size())
             throw new InvalidRequestException(String.format("Invalid number of arguments, expecting %d values but got %d", boundNames.size(), values.size()));
 
-        List<ByteBuffer> keys = insert.buildPartitionKeyNames(values);
-        Composite clusteringPrefix = insert.createClusteringPrefix(values);
+        QueryOptions options = QueryOptions.forInternalCalls(null, values);
+        List<ByteBuffer> keys = insert.buildPartitionKeyNames(options);
+        Composite clusteringPrefix = insert.createClusteringPrefix(options);
 
         long now = System.currentTimeMillis() * 1000;
         UpdateParameters params = new UpdateParameters(insert.cfm,
-                                                       values,
-                                                       insert.getTimestamp(now, values),
-                                                       insert.getTimeToLive(values),
+                                                       options,
+                                                       insert.getTimestamp(now, options),
+                                                       insert.getTimeToLive(options),
                                                        Collections.<ByteBuffer, CQL3Row>emptyMap());
 
         for (ByteBuffer key: keys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 83a391d..0e16fda 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.serializers;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.cassandra.utils.ByteBufferUtil;
+
 public abstract class CollectionSerializer<T> implements TypeSerializer<T>
 {
     public void validate(ByteBuffer bytes) throws MarshalException
@@ -28,24 +30,104 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T>
         // The collection is not currently being properly validated.
     }
 
-    // Utilitary method
-    protected static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int size)
+    protected abstract List<ByteBuffer> serializeValues(T value);
+    protected abstract int getElementCount(T value);
+
+    public abstract T deserializeForNativeProtocol(ByteBuffer buffer, int version);
+
+    public ByteBuffer serialize(T value)
+    {
+        List<ByteBuffer> values = serializeValues(value);
+        // The only case we serialize/deserialize collections internally (i.e. not for the protocol sake),
+        // is when collections are in UDT values. There, we use the protocol 3 version since it's more flexible.
+        return pack(values, getElementCount(value), 3);
+    }
+
+    public T deserialize(ByteBuffer bytes)
     {
-        ByteBuffer result = ByteBuffer.allocate(2 + size);
-        result.putShort((short)elements);
+        // The only case we serialize/deserialize collections internally (i.e. not for the protocol sake),
+        // is when collections are in UDT values. There, we use the protocol 3 version since it's more flexible.
+        return deserializeForNativeProtocol(bytes, 3);
+    }
+
+    public static ByteBuffer pack(List<ByteBuffer> buffers, int elements, int version)
+    {
+        int size = 0;
+        for (ByteBuffer bb : buffers)
+            size += sizeOfValue(bb, version);
+
+        ByteBuffer result = ByteBuffer.allocate(sizeOfCollectionSize(elements, version) + size);
+        writeCollectionSize(result, elements, version);
         for (ByteBuffer bb : buffers)
+            writeValue(result, bb, version);
+        return (ByteBuffer)result.flip();
+    }
+
+    protected static void writeCollectionSize(ByteBuffer output, int elements, int version)
+    {
+        if (version >= 3)
+            output.putInt(elements);
+        else
+            output.putShort((short)elements);
+    }
+
+    protected static int readCollectionSize(ByteBuffer input, int version)
+    {
+        return version >= 3 ? input.getInt() : ByteBufferUtil.readShortLength(input);
+    }
+
+    protected static int sizeOfCollectionSize(int elements, int version)
+    {
+        return version >= 3 ? 4 : 2;
+    }
+
+    protected static void writeValue(ByteBuffer output, ByteBuffer value, int version)
+    {
+        if (version >= 3)
         {
-            result.putShort((short)bb.remaining());
-            result.put(bb.duplicate());
+            if (value == null)
+            {
+                output.putInt(-1);
+                return;
+            }
+
+            output.putInt(value.remaining());
+            output.put(value.duplicate());
+        }
+        else
+        {
+            assert value != null;
+            output.putShort((short)value.remaining());
+            output.put(value.duplicate());
         }
-        return (ByteBuffer)result.flip();
     }
 
-    public static ByteBuffer pack(List<ByteBuffer> buffers, int elements)
+    protected static ByteBuffer readValue(ByteBuffer input, int version)
     {
-        int size = 0;
-        for (ByteBuffer bb : buffers)
-            size += 2 + bb.remaining();
-        return pack(buffers, elements, size);
+        if (version >= 3)
+        {
+            int size = input.getInt();
+            if (size < 0)
+                return null;
+
+            return ByteBufferUtil.readBytes(input, size);
+        }
+        else
+        {
+            return ByteBufferUtil.readBytesWithShortLength(input);
+        }
+    }
+
+    protected static int sizeOfValue(ByteBuffer value, int version)
+    {
+        if (version >= 3)
+        {
+            return value == null ? 4 : 4 + value.remaining();
+        }
+        else
+        {
+            assert value != null;
+            return 2 + value.remaining();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/ListSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java b/src/java/org/apache/cassandra/serializers/ListSerializer.java
index 59f25d2..e662341 100644
--- a/src/java/org/apache/cassandra/serializers/ListSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java
@@ -47,16 +47,29 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
         this.elements = elements;
     }
 
-    public List<T> deserialize(ByteBuffer bytes)
+    public List<ByteBuffer> serializeValues(List<T> values)
+    {
+        List<ByteBuffer> buffers = new ArrayList<>(values.size());
+        for (T value : values)
+            buffers.add(elements.serialize(value));
+        return buffers;
+    }
+
+    public int getElementCount(List<T> value)
+    {
+        return value.size();
+    }
+
+    public List<T> deserializeForNativeProtocol(ByteBuffer bytes, int version)
     {
         try
         {
             ByteBuffer input = bytes.duplicate();
-            int n = ByteBufferUtil.readShortLength(input);
+            int n = readCollectionSize(input, version);
             List<T> l = new ArrayList<T>(n);
             for (int i = 0; i < n; i++)
             {
-                ByteBuffer databb = ByteBufferUtil.readBytesWithShortLength(input);
+                ByteBuffer databb = readValue(input, version);
                 elements.validate(databb);
                 l.add(elements.deserialize(databb));
             }
@@ -68,26 +81,6 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
         }
     }
 
-    /**
-     * Layout is: {@code <n><s_1><b_1>...<s_n><b_n> }
-     * where:
-     *   n is the number of elements
-     *   s_i is the number of bytes composing the ith element
-     *   b_i is the s_i bytes composing the ith element
-     */
-    public ByteBuffer serialize(List<T> value)
-    {
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(value.size());
-        int size = 0;
-        for (T elt : value)
-        {
-            ByteBuffer bb = elements.serialize(elt);
-            bbs.add(bb);
-            size += 2 + bb.remaining();
-        }
-        return pack(bbs, value.size(), size);
-    }
-
     public String toString(List<T> value)
     {
         StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/MapSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java b/src/java/org/apache/cassandra/serializers/MapSerializer.java
index f79d07f..5d349dd 100644
--- a/src/java/org/apache/cassandra/serializers/MapSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java
@@ -51,19 +51,35 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
         this.values = values;
     }
 
-    public Map<K, V> deserialize(ByteBuffer bytes)
+    public List<ByteBuffer> serializeValues(Map<K, V> map)
+    {
+        List<ByteBuffer> buffers = new ArrayList<>(map.size() * 2);
+        for (Map.Entry<K, V> entry : map.entrySet())
+        {
+            buffers.add(keys.serialize(entry.getKey()));
+            buffers.add(values.serialize(entry.getValue()));
+        }
+        return buffers;
+    }
+
+    public int getElementCount(Map<K, V> value)
+    {
+        return value.size();
+    }
+
+    public Map<K, V> deserializeForNativeProtocol(ByteBuffer bytes, int version)
     {
         try
         {
             ByteBuffer input = bytes.duplicate();
-            int n = ByteBufferUtil.readShortLength(input);
+            int n = readCollectionSize(input, version);
             Map<K, V> m = new LinkedHashMap<K, V>(n);
             for (int i = 0; i < n; i++)
             {
-                ByteBuffer kbb = ByteBufferUtil.readBytesWithShortLength(input);
+                ByteBuffer kbb = readValue(input, version);
                 keys.validate(kbb);
 
-                ByteBuffer vbb = ByteBufferUtil.readBytesWithShortLength(input);
+                ByteBuffer vbb = readValue(input, version);
                 values.validate(vbb);
 
                 m.put(keys.deserialize(kbb), values.deserialize(vbb));
@@ -76,30 +92,6 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>>
         }
     }
 
-    /**
-     * Layout is: {@code <n><sk_1><k_1><sv_1><v_1>...<sk_n><k_n><sv_n><v_n> }
-     * where:
-     *   n is the number of elements
-     *   sk_i is the number of bytes composing the ith key k_i
-     *   k_i is the sk_i bytes composing the ith key
-     *   sv_i is the number of bytes composing the ith value v_i
-     *   v_i is the sv_i bytes composing the ith value
-     */
-    public ByteBuffer serialize(Map<K, V> value)
-    {
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(2 * value.size());
-        int size = 0;
-        for (Map.Entry<K, V> entry : value.entrySet())
-        {
-            ByteBuffer bbk = keys.serialize(entry.getKey());
-            ByteBuffer bbv = values.serialize(entry.getValue());
-            bbs.add(bbk);
-            bbs.add(bbv);
-            size += 4 + bbk.remaining() + bbv.remaining();
-        }
-        return pack(bbs, value.size(), size);
-    }
-
     public String toString(Map<K, V> value)
     {
         StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/serializers/SetSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/SetSerializer.java b/src/java/org/apache/cassandra/serializers/SetSerializer.java
index d6d7062..812dd68 100644
--- a/src/java/org/apache/cassandra/serializers/SetSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/SetSerializer.java
@@ -47,16 +47,29 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
         this.elements = elements;
     }
 
-    public Set<T> deserialize(ByteBuffer bytes)
+    public List<ByteBuffer> serializeValues(Set<T> values)
+    {
+        List<ByteBuffer> buffers = new ArrayList<>(values.size());
+        for (T value : values)
+            buffers.add(elements.serialize(value));
+        return buffers;
+    }
+
+    public int getElementCount(Set<T> value)
+    {
+        return value.size();
+    }
+
+    public Set<T> deserializeForNativeProtocol(ByteBuffer bytes, int version)
     {
         try
         {
             ByteBuffer input = bytes.duplicate();
-            int n = ByteBufferUtil.readShortLength(input);
+            int n = readCollectionSize(input, version);
             Set<T> l = new LinkedHashSet<T>(n);
             for (int i = 0; i < n; i++)
             {
-                ByteBuffer databb = ByteBufferUtil.readBytesWithShortLength(input);
+                ByteBuffer databb = readValue(input, version);
                 elements.validate(databb);
                 l.add(elements.deserialize(databb));
             }
@@ -68,26 +81,6 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>>
         }
     }
 
-    /**
-     * Layout is: {@code <n><s_1><b_1>...<s_n><b_n> }
-     * where:
-     *   n is the number of elements
-     *   s_i is the number of bytes composing the ith element
-     *   b_i is the s_i bytes composing the ith element
-     */
-    public ByteBuffer serialize(Set<T> value)
-    {
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(value.size());
-        int size = 0;
-        for (T elt : value)
-        {
-            ByteBuffer bb = elements.serialize(elt);
-            bbs.add(bb);
-            size += 2 + bb.remaining();
-        }
-        return pack(bbs, value.size(), size);
-    }
-
     public String toString(Set<T> value)
     {
         StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/service/IMigrationListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java
index e16ac62..4d142bd 100644
--- a/src/java/org/apache/cassandra/service/IMigrationListener.java
+++ b/src/java/org/apache/cassandra/service/IMigrationListener.java
@@ -21,10 +21,13 @@ public interface IMigrationListener
 {
     public void onCreateKeyspace(String ksName);
     public void onCreateColumnFamily(String ksName, String cfName);
+    public void onCreateUserType(String ksName, String typeName);
 
     public void onUpdateKeyspace(String ksName);
     public void onUpdateColumnFamily(String ksName, String cfName);
+    public void onUpdateUserType(String ksName, String typeName);
 
     public void onDropKeyspace(String ksName);
     public void onDropColumnFamily(String ksName, String cfName);
+    public void onDropUserType(String ksName, String typeName);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 7eb7282..ec46d3f 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -167,6 +167,12 @@ public class MigrationManager
             listener.onCreateColumnFamily(cfm.ksName, cfm.cfName);
     }
 
+    public void notifyCreateUserType(UserType ut)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onCreateUserType(ut.keyspace, ut.getNameAsString());
+    }
+
     public void notifyUpdateKeyspace(KSMetaData ksm)
     {
         for (IMigrationListener listener : listeners)
@@ -179,6 +185,12 @@ public class MigrationManager
             listener.onUpdateColumnFamily(cfm.ksName, cfm.cfName);
     }
 
+    public void notifyUpdateUserType(UserType ut)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onUpdateUserType(ut.keyspace, ut.getNameAsString());
+    }
+
     public void notifyDropKeyspace(KSMetaData ksm)
     {
         for (IMigrationListener listener : listeners)
@@ -191,6 +203,12 @@ public class MigrationManager
             listener.onDropColumnFamily(cfm.ksName, cfm.cfName);
     }
 
+    public void notifyDropUserType(UserType ut)
+    {
+        for (IMigrationListener listener : listeners)
+            listener.onDropUserType(ut.keyspace, ut.getNameAsString());
+    }
+
     public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException
     {
         announceNewKeyspace(ksm, FBUtilities.timestampMicros());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index c4abe0b..3040aaf 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -1969,7 +1969,7 @@ public class CassandraServer implements Cassandra.Iface
             }
 
             ThriftClientState cState = state();
-            return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), new QueryOptions(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();
+            return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
@@ -2100,7 +2100,7 @@ public class CassandraServer implements Cassandra.Iface
 
             return cState.getCQLQueryHandler().processPrepared(statement,
                                                                cState.getQueryState(),
-                                                               new QueryOptions(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult();
+                                                               QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult();
         }
         catch (RequestExecutionException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index e5222a1..36a7e71 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -36,6 +36,7 @@ import io.netty.util.CharsetUtil;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
 /**
@@ -363,6 +364,22 @@ public abstract class CBUtil
         return size;
     }
 
+    public static Pair<List<String>, List<ByteBuffer>> readNameAndValueList(ByteBuf cb)
+    {
+        int size = cb.readUnsignedShort();
+        if (size == 0)
+            return Pair.create(Collections.<String>emptyList(), Collections.<ByteBuffer>emptyList());
+
+        List<String> s = new ArrayList<>(size);
+        List<ByteBuffer> l = new ArrayList<>(size);
+        for (int i = 0; i < size; i++)
+        {
+            s.add(readString(cb));
+            l.add(readValue(cb));
+        }
+        return Pair.create(s, l);
+    }
+
     public static InetSocketAddress readInet(ByteBuf cb)
     {
         int addrSize = cb.readByte();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 4a50bde..989b954 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -128,7 +128,7 @@ public class Client extends SimpleClient
                     return null;
                 }
             }
-            return new QueryMessage(query, new QueryOptions(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null));
+            return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null));
         }
         else if (msgType.equals("PREPARE"))
         {
@@ -156,7 +156,7 @@ public class Client extends SimpleClient
                     }
                     values.add(bb);
                 }
-                return new ExecuteMessage(MD5Digest.wrap(id), new QueryOptions(ConsistencyLevel.ONE, values));
+                return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.forInternalCalls(ConsistencyLevel.ONE, values));
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/DataType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/DataType.java b/src/java/org/apache/cassandra/transport/DataType.java
index f0b5d95..3cff973 100644
--- a/src/java/org/apache/cassandra/transport/DataType.java
+++ b/src/java/org/apache/cassandra/transport/DataType.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.transport;
 
-import java.nio.charset.StandardCharsets;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -51,7 +51,9 @@ public enum DataType implements OptionCodec.Codecable<DataType>
     INET     (16, InetAddressType.instance),
     LIST     (32, null),
     MAP      (33, null),
-    SET      (34, null);
+    SET      (34, null),
+    UDT      (48, null);
+
 
     public static final OptionCodec<DataType> codec = new OptionCodec<DataType>(DataType.class);
 
@@ -78,27 +80,39 @@ public enum DataType implements OptionCodec.Codecable<DataType>
         return id;
     }
 
-    public Object readValue(ByteBuf cb)
+    public Object readValue(ByteBuf cb, int version)
     {
         switch (this)
         {
             case CUSTOM:
                 return CBUtil.readString(cb);
             case LIST:
-                return DataType.toType(codec.decodeOne(cb));
+                return DataType.toType(codec.decodeOne(cb, version));
             case SET:
-                return DataType.toType(codec.decodeOne(cb));
+                return DataType.toType(codec.decodeOne(cb, version));
             case MAP:
                 List<AbstractType> l = new ArrayList<AbstractType>(2);
-                l.add(DataType.toType(codec.decodeOne(cb)));
-                l.add(DataType.toType(codec.decodeOne(cb)));
+                l.add(DataType.toType(codec.decodeOne(cb, version)));
+                l.add(DataType.toType(codec.decodeOne(cb, version)));
                 return l;
+            case UDT:
+                String ks = CBUtil.readString(cb);
+                ByteBuffer name = UTF8Type.instance.decompose(CBUtil.readString(cb));
+                int n = cb.readUnsignedShort();
+                List<ByteBuffer> fieldNames = new ArrayList<>(n);
+                List<AbstractType<?>> fieldTypes = new ArrayList<>(n);
+                for (int i = 0; i < n; i++)
+                {
+                    fieldNames.add(UTF8Type.instance.decompose(CBUtil.readString(cb)));
+                    fieldTypes.add(DataType.toType(codec.decodeOne(cb, version)));
+                }
+                return new UserType(ks, name, fieldNames, fieldTypes);
             default:
                 return null;
         }
     }
 
-    public void writeValue(Object value, ByteBuf cb)
+    public void writeValue(Object value, ByteBuf cb, int version)
     {
         switch (this)
         {
@@ -107,40 +121,63 @@ public enum DataType implements OptionCodec.Codecable<DataType>
                 CBUtil.writeString((String)value, cb);
                 break;
             case LIST:
-                codec.writeOne(DataType.fromType((AbstractType)value), cb);
+                codec.writeOne(DataType.fromType((AbstractType)value, version), cb, version);
                 break;
             case SET:
-                codec.writeOne(DataType.fromType((AbstractType)value), cb);
+                codec.writeOne(DataType.fromType((AbstractType)value, version), cb, version);
                 break;
             case MAP:
                 List<AbstractType> l = (List<AbstractType>)value;
-                codec.writeOne(DataType.fromType(l.get(0)), cb);
-                codec.writeOne(DataType.fromType(l.get(1)), cb);
+                codec.writeOne(DataType.fromType(l.get(0), version), cb, version);
+                codec.writeOne(DataType.fromType(l.get(1), version), cb, version);
+                break;
+            case UDT:
+                UserType udt = (UserType)value;
+                CBUtil.writeString(udt.keyspace, cb);
+                CBUtil.writeString(UTF8Type.instance.compose(udt.name), cb);
+                cb.writeShort(udt.columnNames.size());
+                for (int i = 0; i < udt.columnNames.size(); i++)
+                {
+                    CBUtil.writeString(UTF8Type.instance.compose(udt.columnNames.get(i)), cb);
+                    codec.writeOne(DataType.fromType(udt.types.get(i), version), cb, version);
+                }
                 break;
         }
     }
 
-    public int serializedValueSize(Object value)
+    public int serializedValueSize(Object value, int version)
     {
         switch (this)
         {
             case CUSTOM:
-                return 2 + ((String)value).getBytes(StandardCharsets.UTF_8).length;
+                return CBUtil.sizeOfString((String)value);
             case LIST:
             case SET:
-                return codec.oneSerializedSize(DataType.fromType((AbstractType)value));
+                return codec.oneSerializedSize(DataType.fromType((AbstractType)value, version), version);
             case MAP:
                 List<AbstractType> l = (List<AbstractType>)value;
                 int s = 0;
-                s += codec.oneSerializedSize(DataType.fromType(l.get(0)));
-                s += codec.oneSerializedSize(DataType.fromType(l.get(1)));
+                s += codec.oneSerializedSize(DataType.fromType(l.get(0), version), version);
+                s += codec.oneSerializedSize(DataType.fromType(l.get(1), version), version);
                 return s;
+            case UDT:
+                UserType udt = (UserType)value;
+                int size = 0;
+                size += CBUtil.sizeOfString(udt.keyspace);
+                size += CBUtil.sizeOfString(UTF8Type.instance.compose(udt.name));
+                size += 2;
+                for (int i = 0; i < udt.columnNames.size(); i++)
+                {
+                    size += CBUtil.sizeOfString(UTF8Type.instance.compose(udt.columnNames.get(i)));
+                    size += codec.oneSerializedSize(DataType.fromType(udt.types.get(i), version), version);
+                }
+                return size;
             default:
                 return 0;
         }
     }
 
-    public static Pair<DataType, Object> fromType(AbstractType type)
+    public static Pair<DataType, Object> fromType(AbstractType type, int version)
     {
         // For CQL3 clients, ReversedType is an implementation detail and they
         // shouldn't have to care about it.
@@ -170,6 +207,10 @@ public enum DataType implements OptionCodec.Codecable<DataType>
                     return Pair.<DataType, Object>create(SET, ((SetType)type).elements);
                 }
             }
+
+            if (type instanceof UserType && version >= 3)
+                return Pair.<DataType, Object>create(UDT, type);
+
             return Pair.<DataType, Object>create(CUSTOM, type.toString());
         }
         else
@@ -193,6 +234,8 @@ public enum DataType implements OptionCodec.Codecable<DataType>
                 case MAP:
                     List<AbstractType> l = (List<AbstractType>)entry.right;
                     return MapType.getInstance(l.get(0), l.get(1));
+                case UDT:
+                    return (AbstractType)entry.right;
                 default:
                     return entry.left.type;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 242ad64..7ec026e 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.transport;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
+import com.google.common.base.Objects;
 import io.netty.buffer.ByteBuf;
 
 public abstract class Event
@@ -33,33 +34,33 @@ public abstract class Event
         this.type = type;
     }
 
-    public static Event deserialize(ByteBuf cb)
+    public static Event deserialize(ByteBuf cb, int version)
     {
         switch (CBUtil.readEnumValue(Type.class, cb))
         {
             case TOPOLOGY_CHANGE:
-                return TopologyChange.deserializeEvent(cb);
+                return TopologyChange.deserializeEvent(cb, version);
             case STATUS_CHANGE:
-                return StatusChange.deserializeEvent(cb);
+                return StatusChange.deserializeEvent(cb, version);
             case SCHEMA_CHANGE:
-                return SchemaChange.deserializeEvent(cb);
+                return SchemaChange.deserializeEvent(cb, version);
         }
         throw new AssertionError();
     }
 
-    public void serialize(ByteBuf dest)
+    public void serialize(ByteBuf dest, int version)
     {
         CBUtil.writeEnumValue(type, dest);
-        serializeEvent(dest);
+        serializeEvent(dest, version);
     }
 
-    public int serializedSize()
+    public int serializedSize(int version)
     {
-        return CBUtil.sizeOfEnumValue(type) + eventSerializedSize();
+        return CBUtil.sizeOfEnumValue(type) + eventSerializedSize(version);
     }
 
-    protected abstract void serializeEvent(ByteBuf dest);
-    protected abstract int eventSerializedSize();
+    protected abstract void serializeEvent(ByteBuf dest, int version);
+    protected abstract int eventSerializedSize(int version);
 
     public static class TopologyChange extends Event
     {
@@ -91,20 +92,20 @@ public abstract class Event
         }
 
         // Assumes the type has already been deserialized
-        private static TopologyChange deserializeEvent(ByteBuf cb)
+        private static TopologyChange deserializeEvent(ByteBuf cb, int version)
         {
             Change change = CBUtil.readEnumValue(Change.class, cb);
             InetSocketAddress node = CBUtil.readInet(cb);
             return new TopologyChange(change, node);
         }
 
-        protected void serializeEvent(ByteBuf dest)
+        protected void serializeEvent(ByteBuf dest, int version)
         {
             CBUtil.writeEnumValue(change, dest);
             CBUtil.writeInet(node, dest);
         }
 
-        protected int eventSerializedSize()
+        protected int eventSerializedSize(int version)
         {
             return CBUtil.sizeOfEnumValue(change) + CBUtil.sizeOfInet(node);
         }
@@ -114,6 +115,23 @@ public abstract class Event
         {
             return change + " " + node;
         }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hashCode(change, node);
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof TopologyChange))
+                return false;
+
+            TopologyChange tpc = (TopologyChange)other;
+            return Objects.equal(change, tpc.change)
+                && Objects.equal(node, tpc.node);
+        }
     }
 
     public static class StatusChange extends Event
@@ -141,20 +159,20 @@ public abstract class Event
         }
 
         // Assumes the type has already been deserialized
-        private static StatusChange deserializeEvent(ByteBuf cb)
+        private static StatusChange deserializeEvent(ByteBuf cb, int version)
         {
             Status status = CBUtil.readEnumValue(Status.class, cb);
             InetSocketAddress node = CBUtil.readInet(cb);
             return new StatusChange(status, node);
         }
 
-        protected void serializeEvent(ByteBuf dest)
+        protected void serializeEvent(ByteBuf dest, int version)
         {
             CBUtil.writeEnumValue(status, dest);
             CBUtil.writeInet(node, dest);
         }
 
-        protected int eventSerializedSize()
+        protected int eventSerializedSize(int version)
         {
             return CBUtil.sizeOfEnumValue(status) + CBUtil.sizeOfInet(node);
         }
@@ -164,56 +182,130 @@ public abstract class Event
         {
             return status + " " + node;
         }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hashCode(status, node);
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof StatusChange))
+                return false;
+
+            StatusChange stc = (StatusChange)other;
+            return Objects.equal(status, stc.status)
+                && Objects.equal(node, stc.node);
+        }
     }
 
     public static class SchemaChange extends Event
     {
         public enum Change { CREATED, UPDATED, DROPPED }
+        public enum Target { KEYSPACE, TABLE, TYPE }
 
         public final Change change;
+        public final Target target;
         public final String keyspace;
-        public final String table;
+        public final String tableOrType;
 
-        public SchemaChange(Change change, String keyspace, String table)
+        public SchemaChange(Change change, Target target, String keyspace, String tableOrType)
         {
             super(Type.SCHEMA_CHANGE);
             this.change = change;
+            this.target = target;
             this.keyspace = keyspace;
-            this.table = table;
+            this.tableOrType = tableOrType;
         }
 
         public SchemaChange(Change change, String keyspace)
         {
-            this(change, keyspace, "");
+            this(change, Target.KEYSPACE, keyspace, null);
         }
 
         // Assumes the type has already been deserialized
-        private static SchemaChange deserializeEvent(ByteBuf cb)
+        private static SchemaChange deserializeEvent(ByteBuf cb, int version)
         {
             Change change = CBUtil.readEnumValue(Change.class, cb);
-            String keyspace = CBUtil.readString(cb);
-            String table = CBUtil.readString(cb);
-            return new SchemaChange(change, keyspace, table);
+            if (version >= 3)
+            {
+                Target target = CBUtil.readEnumValue(Target.class, cb);
+                String keyspace = CBUtil.readString(cb);
+                String tableOrType = target == Target.KEYSPACE ? null : CBUtil.readString(cb);
+                return new SchemaChange(change, target, keyspace, tableOrType);
+            }
+            else
+            {
+                String keyspace = CBUtil.readString(cb);
+                String table = CBUtil.readString(cb);
+                return new SchemaChange(change, table.isEmpty() ? Target.KEYSPACE : Target.TABLE, keyspace, table.isEmpty() ? null : table);
+            }
         }
 
-        protected void serializeEvent(ByteBuf dest)
+        protected void serializeEvent(ByteBuf dest, int version)
         {
-            CBUtil.writeEnumValue(change, dest);
-            CBUtil.writeString(keyspace, dest);
-            CBUtil.writeString(table, dest);
+            if (version >= 3)
+            {
+                CBUtil.writeEnumValue(change, dest);
+                CBUtil.writeEnumValue(target, dest);
+                CBUtil.writeString(keyspace, dest);
+                if (target != Target.KEYSPACE)
+                    CBUtil.writeString(tableOrType, dest);
+            }
+            else
+            {
+                CBUtil.writeEnumValue(change, dest);
+                CBUtil.writeString(keyspace, dest);
+                CBUtil.writeString(target == Target.KEYSPACE ? "" : tableOrType, dest);
+            }
         }
 
-        protected int eventSerializedSize()
+        protected int eventSerializedSize(int version)
         {
-            return CBUtil.sizeOfEnumValue(change)
-                 + CBUtil.sizeOfString(keyspace)
-                 + CBUtil.sizeOfString(table);
+            if (version >= 3)
+            {
+                int size = CBUtil.sizeOfEnumValue(change)
+                         + CBUtil.sizeOfEnumValue(target)
+                         + CBUtil.sizeOfString(keyspace);
+
+                if (target != Target.KEYSPACE)
+                    size += CBUtil.sizeOfString(tableOrType);
+
+                return size;
+            }
+            else
+            {
+                return CBUtil.sizeOfEnumValue(change)
+                     + CBUtil.sizeOfString(keyspace)
+                     + CBUtil.sizeOfString(target == Target.KEYSPACE ? "" : tableOrType);
+            }
         }
 
         @Override
         public String toString()
         {
-            return change + " " + keyspace + (table.isEmpty() ? "" : "." + table);
+            return change + " " + target + " " + keyspace + (tableOrType == null ? "" : "." + tableOrType);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hashCode(change, target, keyspace, tableOrType);
+        }
+
+        @Override
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof SchemaChange))
+                return false;
+
+            SchemaChange scc = (SchemaChange)other;
+            return Objects.equal(change, scc.change)
+                && Objects.equal(target, scc.target)
+                && Objects.equal(keyspace, scc.keyspace)
+                && Objects.equal(tableOrType, scc.tableOrType);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/OptionCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/OptionCodec.java b/src/java/org/apache/cassandra/transport/OptionCodec.java
index 9b82bda..ec2a1fa 100644
--- a/src/java/org/apache/cassandra/transport/OptionCodec.java
+++ b/src/java/org/apache/cassandra/transport/OptionCodec.java
@@ -32,9 +32,9 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
     {
         public int getId();
 
-        public Object readValue(ByteBuf cb);
-        public void writeValue(Object value, ByteBuf cb);
-        public int serializedValueSize(Object obj);
+        public Object readValue(ByteBuf cb, int version);
+        public void writeValue(Object value, ByteBuf cb, int version);
+        public int serializedValueSize(Object obj, int version);
     }
 
     private final Class<T> klass;
@@ -66,14 +66,14 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         return opt;
     }
 
-    public Map<T, Object> decode(ByteBuf body)
+    public Map<T, Object> decode(ByteBuf body, int version)
     {
         EnumMap<T, Object> options = new EnumMap<T, Object>(klass);
         int n = body.readUnsignedShort();
         for (int i = 0; i < n; i++)
         {
             T opt = fromId(body.readUnsignedShort());
-            Object value = opt.readValue(body);
+            Object value = opt.readValue(body, version);
             if (options.containsKey(opt))
                 throw new ProtocolException(String.format("Duplicate option %s in message", opt.name()));
             options.put(opt, value);
@@ -81,41 +81,41 @@ public class OptionCodec<T extends Enum<T> & OptionCodec.Codecable<T>>
         return options;
     }
 
-    public ByteBuf encode(Map<T, Object> options)
+    public ByteBuf encode(Map<T, Object> options, int version)
     {
         int optLength = 2;
         for (Map.Entry<T, Object> entry : options.entrySet())
-            optLength += 2 + entry.getKey().serializedValueSize(entry.getValue());
+            optLength += 2 + entry.getKey().serializedValueSize(entry.getValue(), version);
         ByteBuf cb = Unpooled.buffer(optLength);
         cb.writeShort(options.size());
         for (Map.Entry<T, Object> entry : options.entrySet())
         {
             T opt = entry.getKey();
             cb.writeShort(opt.getId());
-            opt.writeValue(entry.getValue(), cb);
+            opt.writeValue(entry.getValue(), cb, version);
         }
         return cb;
     }
 
-    public Pair<T, Object> decodeOne(ByteBuf body)
+    public Pair<T, Object> decodeOne(ByteBuf body, int version)
     {
         T opt = fromId(body.readUnsignedShort());
-        Object value = opt.readValue(body);
+        Object value = opt.readValue(body, version);
         return Pair.create(opt, value);
     }
 
-    public void writeOne(Pair<T, Object> option, ByteBuf dest)
+    public void writeOne(Pair<T, Object> option, ByteBuf dest, int version)
     {
         T opt = option.left;
         Object obj = option.right;
         dest.writeShort(opt.getId());
-        opt.writeValue(obj, dest);
+        opt.writeValue(obj, dest, version);
     }
 
-    public int oneSerializedSize(Pair<T, Object> option)
+    public int oneSerializedSize(Pair<T, Object> option, int version)
     {
         T opt = option.left;
         Object obj = option.right;
-        return 2 + opt.serializedValueSize(obj);
+        return 2 + opt.serializedValueSize(obj, version);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 8d08ffd..eb2b043 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -378,7 +378,12 @@ public class Server implements CassandraDaemon.Server
 
         public void onCreateColumnFamily(String ksName, String cfName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName, cfName));
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+        }
+
+        public void onCreateUserType(String ksName, String typeName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }
 
         public void onUpdateKeyspace(String ksName)
@@ -388,7 +393,12 @@ public class Server implements CassandraDaemon.Server
 
         public void onUpdateColumnFamily(String ksName, String cfName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName, cfName));
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+        }
+
+        public void onUpdateUserType(String ksName, String typeName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }
 
         public void onDropKeyspace(String ksName)
@@ -398,7 +408,12 @@ public class Server implements CassandraDaemon.Server
 
         public void onDropColumnFamily(String ksName, String cfName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName, cfName));
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
+        }
+
+        public void onDropUserType(String ksName, String typeName)
+        {
+            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index ef56882..3cf9b7b 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -157,7 +157,7 @@ public class SimpleClient
 
     public ResultMessage execute(String query, List<ByteBuffer> values, ConsistencyLevel consistencyLevel)
     {
-        Message.Response msg = execute(new QueryMessage(query, new QueryOptions(consistencyLevel, values)));
+        Message.Response msg = execute(new QueryMessage(query, QueryOptions.forInternalCalls(consistencyLevel, values)));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }
@@ -171,7 +171,7 @@ public class SimpleClient
 
     public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
     {
-        Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), new QueryOptions(consistency, values)));
+        Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), QueryOptions.forInternalCalls(consistency, values)));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }