You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/30 20:22:42 UTC

[3/4] Native protocol v3

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 76b1eeb..12accaf 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
@@ -25,120 +26,244 @@ import java.util.List;
 import io.netty.buffer.ByteBuf;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.transport.CBCodec;
 import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Options for a query.
  */
-public class QueryOptions
+public abstract class QueryOptions
 {
-    public static final QueryOptions DEFAULT = new QueryOptions(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList());
+    public static final QueryOptions DEFAULT = new DefaultQueryOptions(ConsistencyLevel.ONE,
+                                                                       Collections.<ByteBuffer>emptyList(),
+                                                                       false,
+                                                                       SpecificOptions.DEFAULT,
+                                                                       3);
 
     public static final CBCodec<QueryOptions> codec = new Codec();
 
-    private final ConsistencyLevel consistency;
-    private final List<ByteBuffer> values;
-    private final boolean skipMetadata;
-
-    private final SpecificOptions options;
+    public static QueryOptions fromProtocolV1(ConsistencyLevel consistency, List<ByteBuffer> values)
+    {
+        return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 1);
+    }
 
-    // The protocol version of incoming queries. This is set during deserializaion and will be 0
-    // if the QueryOptions does not come from a user message (or come from thrift).
-    private final transient int protocolVersion;
+    public static QueryOptions fromProtocolV2(ConsistencyLevel consistency, List<ByteBuffer> values)
+    {
+        return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 2);
+    }
 
-    public QueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values)
+    public static QueryOptions forInternalCalls(ConsistencyLevel consistency, List<ByteBuffer> values)
     {
-        this(consistency, values, false, SpecificOptions.DEFAULT, 0);
+        return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 0);
     }
 
-    public QueryOptions(ConsistencyLevel consistency,
-                        List<ByteBuffer> values,
-                        boolean skipMetadata,
-                        int pageSize,
-                        PagingState pagingState,
-                        ConsistencyLevel serialConsistency)
+    public static QueryOptions fromPreV3Batch(ConsistencyLevel consistency)
     {
-        this(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency), 0);
+        return new DefaultQueryOptions(consistency, Collections.<ByteBuffer>emptyList(), false, SpecificOptions.DEFAULT, 2);
     }
 
-    private QueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, int protocolVersion)
+    public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency)
     {
-        this.consistency = consistency;
-        this.values = values;
-        this.skipMetadata = skipMetadata;
-        this.options = options;
-        this.protocolVersion = protocolVersion;
+        return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L), 0);
     }
 
-    public static QueryOptions fromProtocolV1(ConsistencyLevel consistency, List<ByteBuffer> values)
+    public abstract ConsistencyLevel getConsistency();
+    public abstract List<ByteBuffer> getValues();
+    public abstract boolean skipMetadata();
+
+    /**  The pageSize for this query. Will be <= 0 if not relevant for the query.  */
+    public int getPageSize()
     {
-        return new QueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 1);
+        return getSpecificOptions().pageSize;
     }
 
-    public ConsistencyLevel getConsistency()
+    /** The paging state for this query, or null if not relevant. */
+    public PagingState getPagingState()
     {
-        return consistency;
+        return getSpecificOptions().state;
     }
 
-    public List<ByteBuffer> getValues()
+    /**  Serial consistency for conditional updates. */
+    public ConsistencyLevel getSerialConsistency()
     {
-        return values;
+        return getSpecificOptions().serialConsistency;
     }
 
-    public boolean skipMetadata()
+    public long getTimestamp(QueryState state)
     {
-        return skipMetadata;
+        long tstamp = getSpecificOptions().timestamp;
+        return tstamp >= 0 ? tstamp : state.getTimestamp();
     }
 
     /**
-     * The pageSize for this query. Will be <= 0 if not relevant for the query.
+     * The protocol version for the query. Will be 3 if the object don't come from
+     * a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
      */
-    public int getPageSize()
+    public abstract int getProtocolVersion();
+
+    public abstract QueryOptions withProtocolVersion(int version);
+
+    // Mainly for the sake of BatchQueryOptions
+    abstract SpecificOptions getSpecificOptions();
+
+    public QueryOptions prepare(List<ColumnSpecification> specs)
     {
-        return options.pageSize;
+        return this;
     }
 
-    /**
-     * The paging state for this query, or null if not relevant.
-     */
-    public PagingState getPagingState()
+    static class DefaultQueryOptions extends QueryOptions
     {
-        return options.state;
+        private final ConsistencyLevel consistency;
+        private final List<ByteBuffer> values;
+        private final boolean skipMetadata;
+
+        private final SpecificOptions options;
+
+        // The protocol version of incoming queries. This is set during deserializaion and will be 0
+        // if the QueryOptions does not come from a user message (or come from thrift).
+        private final transient int protocolVersion;
+
+        DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, int protocolVersion)
+        {
+            this.consistency = consistency;
+            this.values = values;
+            this.skipMetadata = skipMetadata;
+            this.options = options;
+            this.protocolVersion = protocolVersion;
+        }
+
+        public QueryOptions withProtocolVersion(int version)
+        {
+            return new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
+        }
+
+        public ConsistencyLevel getConsistency()
+        {
+            return consistency;
+        }
+
+        public List<ByteBuffer> getValues()
+        {
+            return values;
+        }
+
+        public boolean skipMetadata()
+        {
+            return skipMetadata;
+        }
+
+        public int getProtocolVersion()
+        {
+            return protocolVersion;
+        }
+
+        SpecificOptions getSpecificOptions()
+        {
+            return options;
+        }
     }
 
-    /**
-     * Serial consistency for conditional updates.
-     */
-    public ConsistencyLevel getSerialConsistency()
+    static abstract class QueryOptionsWrapper extends QueryOptions
     {
-        return options.serialConsistency;
+        protected final QueryOptions wrapped;
+
+        QueryOptionsWrapper(QueryOptions wrapped)
+        {
+            this.wrapped = wrapped;
+        }
+
+        public ConsistencyLevel getConsistency()
+        {
+            return wrapped.getConsistency();
+        }
+
+        public boolean skipMetadata()
+        {
+            return wrapped.skipMetadata();
+        }
+
+        public int getProtocolVersion()
+        {
+            return wrapped.getProtocolVersion();
+        }
+
+        SpecificOptions getSpecificOptions()
+        {
+            return wrapped.getSpecificOptions();
+        }
+
+        @Override
+        public QueryOptions prepare(List<ColumnSpecification> specs)
+        {
+            wrapped.prepare(specs);
+            return this;
+        }
+
+        public QueryOptions withProtocolVersion(int version)
+        {
+            return new DefaultQueryOptions(getConsistency(), getValues(), skipMetadata(),  getSpecificOptions(), version);
+        }
     }
 
-    /**
-     * The protocol version for the query. Will be 0 if the object don't come from
-     * a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
-     */
-    public int getProtocolVersion()
+    static class OptionsWithNames extends QueryOptionsWrapper
     {
-        return protocolVersion;
+        private final List<String> names;
+        private List<ByteBuffer> orderedValues;
+
+        OptionsWithNames(DefaultQueryOptions wrapped, List<String> names)
+        {
+            super(wrapped);
+            this.names = names;
+        }
+
+        @Override
+        public QueryOptions prepare(List<ColumnSpecification> specs)
+        {
+            super.prepare(specs);
+
+            orderedValues = new ArrayList<ByteBuffer>(specs.size());
+            for (int i = 0; i < specs.size(); i++)
+            {
+                String name = specs.get(i).name.toString();
+                for (int j = 0; j < names.size(); j++)
+                {
+                    if (name.equals(names.get(j)))
+                    {
+                        orderedValues.add(wrapped.getValues().get(j));
+                        break;
+                    }
+                }
+            }
+            return this;
+        }
+
+        public List<ByteBuffer> getValues()
+        {
+            assert orderedValues != null; // We should have called prepare first!
+            return orderedValues;
+        }
     }
 
     // Options that are likely to not be present in most queries
-    private static class SpecificOptions
+    static class SpecificOptions
     {
-        private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null);
+        private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, -1L);
 
         private final int pageSize;
         private final PagingState state;
         private final ConsistencyLevel serialConsistency;
+        private final long timestamp;
 
-        private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency)
+        private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp)
         {
             this.pageSize = pageSize;
             this.state = state;
             this.serialConsistency = serialConsistency == null ? ConsistencyLevel.SERIAL : serialConsistency;
+            this.timestamp = timestamp;
         }
     }
 
@@ -151,7 +276,9 @@ public class QueryOptions
             SKIP_METADATA,
             PAGE_SIZE,
             PAGING_STATE,
-            SERIAL_CONSISTENCY;
+            SERIAL_CONSISTENCY,
+            TIMESTAMP,
+            NAMES_FOR_VALUES;
 
             public static EnumSet<Flag> deserialize(int flags)
             {
@@ -181,9 +308,21 @@ public class QueryOptions
             ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
             EnumSet<Flag> flags = Flag.deserialize((int)body.readByte());
 
-            List<ByteBuffer> values = flags.contains(Flag.VALUES)
-                                    ? CBUtil.readValueList(body)
-                                    : Collections.<ByteBuffer>emptyList();
+            List<ByteBuffer> values = Collections.<ByteBuffer>emptyList();
+            List<String> names = null;
+            if (flags.contains(Flag.VALUES))
+            {
+                if (flags.contains(Flag.NAMES_FOR_VALUES))
+                {
+                    Pair<List<String>, List<ByteBuffer>> namesAndValues = CBUtil.readNameAndValueList(body);
+                    names = namesAndValues.left;
+                    values = namesAndValues.right;
+                }
+                else
+                {
+                    values = CBUtil.readValueList(body);
+                }
+            }
 
             boolean skipMetadata = flags.contains(Flag.SKIP_METADATA);
             flags.remove(Flag.VALUES);
@@ -195,9 +334,19 @@ public class QueryOptions
                 int pageSize = flags.contains(Flag.PAGE_SIZE) ? body.readInt() : -1;
                 PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValue(body)) : null;
                 ConsistencyLevel serialConsistency = flags.contains(Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : ConsistencyLevel.SERIAL;
-                options = new SpecificOptions(pageSize, pagingState, serialConsistency);
+                long timestamp = -1L;
+                if (flags.contains(Flag.TIMESTAMP))
+                {
+                    long ts = body.readLong();
+                    if (ts < 0)
+                        throw new ProtocolException("Invalid negative (" + ts + ") protocol level timestamp");
+                    timestamp = ts;
+                }
+
+                options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp);
             }
-            return new QueryOptions(consistency, values, skipMetadata, options, version);
+            DefaultQueryOptions opts = new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
+            return names == null ? opts : new OptionsWithNames(opts, names);
         }
 
         public void encode(QueryOptions options, ByteBuf dest, int version)
@@ -217,6 +366,12 @@ public class QueryOptions
                 CBUtil.writeValue(options.getPagingState().serialize(), dest);
             if (flags.contains(Flag.SERIAL_CONSISTENCY))
                 CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest);
+            if (flags.contains(Flag.TIMESTAMP))
+                dest.writeLong(options.getSpecificOptions().timestamp);
+
+            // Note that we don't really have to bother with NAMES_FOR_VALUES server side,
+            // and in fact we never really encode QueryOptions, only decode them, so we
+            // don't bother.
         }
 
         public int encodedSize(QueryOptions options, int version)
@@ -236,6 +391,8 @@ public class QueryOptions
                 size += CBUtil.sizeOfValue(options.getPagingState().serialize());
             if (flags.contains(Flag.SERIAL_CONSISTENCY))
                 size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
+            if (flags.contains(Flag.TIMESTAMP))
+                size += 8;
 
             return size;
         }
@@ -245,7 +402,7 @@ public class QueryOptions
             EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
             if (options.getValues().size() > 0)
                 flags.add(Flag.VALUES);
-            if (options.skipMetadata)
+            if (options.skipMetadata())
                 flags.add(Flag.SKIP_METADATA);
             if (options.getPageSize() >= 0)
                 flags.add(Flag.PAGE_SIZE);
@@ -253,6 +410,8 @@ public class QueryOptions
                 flags.add(Flag.PAGING_STATE);
             if (options.getSerialConsistency() != ConsistencyLevel.SERIAL)
                 flags.add(Flag.SERIAL_CONSISTENCY);
+            if (options.getSpecificOptions().timestamp >= 0)
+                flags.add(Flag.TIMESTAMP);
             return flags;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index e8cee15..40c45af 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -53,12 +53,12 @@ public class QueryProcessor implements QueryHandler
     private static final long MAX_CACHE_PREPARED_MEMORY = Runtime.getRuntime().maxMemory() / 256;
     private static final int MAX_CACHE_PREPARED_COUNT = 10000;
 
-    private static EntryWeigher<MD5Digest, CQLStatement> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, CQLStatement>()
+    private static EntryWeigher<MD5Digest, ParsedStatement.Prepared> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, ParsedStatement.Prepared>()
     {
         @Override
-        public int weightOf(MD5Digest key, CQLStatement value)
+        public int weightOf(MD5Digest key, ParsedStatement.Prepared value)
         {
-            return Ints.checkedCast(measure(key) + measure(value));
+            return Ints.checkedCast(measure(key) + measure(value.statement) + measure(value.boundNames));
         }
     };
 
@@ -71,12 +71,12 @@ public class QueryProcessor implements QueryHandler
         }
     };
 
-    private static final ConcurrentLinkedHashMap<MD5Digest, CQLStatement> preparedStatements;
+    private static final ConcurrentLinkedHashMap<MD5Digest, ParsedStatement.Prepared> preparedStatements;
     private static final ConcurrentLinkedHashMap<Integer, CQLStatement> thriftPreparedStatements;
 
     static
     {
-        preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, CQLStatement>()
+        preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, ParsedStatement.Prepared>()
                              .maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY)
                              .weigher(cqlMemoryUsageWeigher)
                              .build();
@@ -90,7 +90,7 @@ public class QueryProcessor implements QueryHandler
     {
     }
 
-    public CQLStatement getPrepared(MD5Digest id)
+    public ParsedStatement.Prepared getPrepared(MD5Digest id)
     {
         return preparedStatements.get(id);
     }
@@ -154,29 +154,31 @@ public class QueryProcessor implements QueryHandler
     public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
     throws RequestExecutionException, RequestValidationException
     {
-        return instance.process(queryString, queryState, new QueryOptions(cl, Collections.<ByteBuffer>emptyList()));
+        return instance.process(queryString, queryState, QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()));
     }
 
     public ResultMessage process(String queryString, QueryState queryState, QueryOptions options)
     throws RequestExecutionException, RequestValidationException
     {
-        CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
+        ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());
+        options.prepare(p.boundNames);
+        CQLStatement prepared = p.statement;
         if (prepared.getBoundTerms() != options.getValues().size())
             throw new InvalidRequestException("Invalid amount of bind variables");
 
         return processStatement(prepared, queryState, options);
     }
 
-    public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
+    public static ParsedStatement.Prepared parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
     {
-        return getStatement(queryStr, queryState.getClientState()).statement;
+        return getStatement(queryStr, queryState.getClientState());
     }
 
     public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException
     {
         try
         {
-            ResultMessage result = instance.process(query, QueryState.forInternalCalls(), new QueryOptions(cl, Collections.<ByteBuffer>emptyList()));
+            ResultMessage result = instance.process(query, QueryState.forInternalCalls(), QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()));
             if (result instanceof ResultMessage.Rows)
                 return UntypedResultSet.create(((ResultMessage.Rows)result).result);
             else
@@ -276,7 +278,7 @@ public class QueryProcessor implements QueryHandler
         else
         {
             MD5Digest statementId = MD5Digest.compute(toHash);
-            preparedStatements.put(statementId, prepared.statement);
+            preparedStatements.put(statementId, prepared);
             logger.trace(String.format("Stored prepared statement %s with %d bind markers",
                                        statementId,
                                        prepared.statement.getBoundTerms()));
@@ -312,8 +314,7 @@ public class QueryProcessor implements QueryHandler
         ClientState clientState = queryState.getClientState();
         batch.checkAccess(clientState);
         batch.validate(clientState);
-
-        batch.executeWithPerStatementVariables(options.getConsistency(), queryState, options.getValues());
+        batch.execute(queryState, options);
         return new ResultMessage.Void();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 53ba380..eea0475 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -373,7 +373,7 @@ public class ResultSet
                     String ksName = globalTablesSpec ? globalKsName : CBUtil.readString(body);
                     String cfName = globalTablesSpec ? globalCfName : CBUtil.readString(body);
                     ColumnIdentifier colName = new ColumnIdentifier(CBUtil.readString(body), true);
-                    AbstractType type = DataType.toType(DataType.codec.decodeOne(body));
+                    AbstractType type = DataType.toType(DataType.codec.decodeOne(body, version));
                     names.add(new ColumnSpecification(ksName, cfName, colName, type));
                 }
                 return new Metadata(flags, names).setHasMorePages(state);
@@ -410,7 +410,7 @@ public class ResultSet
                             CBUtil.writeString(name.cfName, dest);
                         }
                         CBUtil.writeString(name.name.toString(), dest);
-                        DataType.codec.writeOne(DataType.fromType(name.type), dest);
+                        DataType.codec.writeOne(DataType.fromType(name.type, version), dest, version);
                     }
                 }
             }
@@ -442,7 +442,7 @@ public class ResultSet
                             size += CBUtil.sizeOfString(name.cfName);
                         }
                         size += CBUtil.sizeOfString(name.name.toString());
-                        size += DataType.codec.oneSerializedSize(DataType.fromType(name.type));
+                        size += DataType.codec.oneSerializedSize(DataType.fromType(name.type, version), version);
                     }
                 }
                 return size;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Sets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index e48a3ce..92a3510 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -33,10 +33,10 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -88,7 +88,7 @@ public abstract class Sets
                 values.add(t);
             }
             DelayedValue value = new DelayedValue(((SetType)receiver.type).elements, values);
-            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+            return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
         }
 
         private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -140,13 +140,13 @@ public abstract class Sets
             this.elements = elements;
         }
 
-        public static Value fromSerialized(ByteBuffer value, SetType type) throws InvalidRequestException
+        public static Value fromSerialized(ByteBuffer value, SetType type, int version) throws InvalidRequestException
         {
             try
             {
                 // Collections have this small hack that validate cannot be called on a serialized object,
                 // but compose does the validation (so we're fine).
-                Set<?> s = (Set<?>)type.compose(value);
+                Set<?> s = (Set<?>)type.getSerializer().deserializeForNativeProtocol(value, version);
                 Set<ByteBuffer> elements = new LinkedHashSet<ByteBuffer>(s.size());
                 for (Object element : s)
                     elements.add(type.elements.decompose(element));
@@ -158,9 +158,9 @@ public abstract class Sets
             }
         }
 
-        public ByteBuffer get()
+        public ByteBuffer get(QueryOptions options)
         {
-            return CollectionType.pack(new ArrayList<ByteBuffer>(elements), elements.size());
+            return CollectionSerializer.pack(new ArrayList<ByteBuffer>(elements), elements.size(), options.getProtocolVersion());
         }
     }
 
@@ -186,12 +186,12 @@ public abstract class Sets
         {
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
             Set<ByteBuffer> buffers = new TreeSet<ByteBuffer>(comparator);
             for (Term t : elements)
             {
-                ByteBuffer bytes = t.bindAndGet(values);
+                ByteBuffer bytes = t.bindAndGet(options);
 
                 if (bytes == null)
                     throw new InvalidRequestException("null is not supported inside collections");
@@ -216,10 +216,10 @@ public abstract class Sets
             assert receiver.type instanceof SetType;
         }
 
-        public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+        public Value bind(QueryOptions options) throws InvalidRequestException
         {
-            ByteBuffer value = values.get(bindIndex);
-            return value == null ? null : Value.fromSerialized(value, (SetType)receiver.type);
+            ByteBuffer value = options.getValues().get(bindIndex);
+            return value == null ? null : Value.fromSerialized(value, (SetType)receiver.type, options.getProtocolVersion());
         }
     }
 
@@ -253,7 +253,7 @@ public abstract class Sets
 
         static void doAdd(Term t, ColumnFamily cf, Composite prefix, ColumnDefinition column, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             if (value == null)
                 return;
 
@@ -277,7 +277,7 @@ public abstract class Sets
 
         public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
         {
-            Term.Terminal value = t.bind(params.variables);
+            Term.Terminal value = t.bind(params.options);
             if (value == null)
                 return;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index d539ecf..481514f 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -48,7 +48,7 @@ public interface Term
      * @return the result of binding all the variables of this NonTerminal (or
      * 'this' if the term is terminal).
      */
-    public Terminal bind(List<ByteBuffer> values) throws InvalidRequestException;
+    public Terminal bind(QueryOptions options) throws InvalidRequestException;
 
     /**
      * A shorter for bind(values).get().
@@ -56,7 +56,7 @@ public interface Term
      * object between the bind and the get (note that we still want to be able
      * to separate bind and get for collections).
      */
-    public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException;
+    public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException;
 
     /**
      * Whether or not that term contains at least one bind marker.
@@ -108,7 +108,7 @@ public interface Term
     public abstract class Terminal implements Term
     {
         public void collectMarkerSpecification(VariableSpecifications boundNames) {}
-        public Terminal bind(List<ByteBuffer> values) { return this; }
+        public Terminal bind(QueryOptions options) { return this; }
 
         // While some NonTerminal may not have bind markers, no Term can be Terminal
         // with a bind marker
@@ -120,11 +120,11 @@ public interface Term
         /**
          * @return the serialized value of this terminal.
          */
-        public abstract ByteBuffer get();
+        public abstract ByteBuffer get(QueryOptions options);
 
-        public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+        public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
         {
-            return get();
+            return get(options);
         }
     }
 
@@ -140,10 +140,10 @@ public interface Term
      */
     public abstract class NonTerminal implements Term
     {
-        public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+        public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
         {
-            Terminal t = bind(values);
-            return t == null ? null : t.get();
+            Terminal t = bind(options);
+            return t == null ? null : t.get(options);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index fad8fae..8a47536 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 public class UpdateParameters
 {
     public final CFMetaData metadata;
-    public final List<ByteBuffer> variables;
+    public final QueryOptions options;
     public final long timestamp;
     private final int ttl;
     public final int localDeletionTime;
@@ -42,10 +42,10 @@ public class UpdateParameters
     // For lists operation that require a read-before-write. Will be null otherwise.
     private final Map<ByteBuffer, CQL3Row> prefetchedLists;
 
-    public UpdateParameters(CFMetaData metadata, List<ByteBuffer> variables, long timestamp, int ttl, Map<ByteBuffer, CQL3Row> prefetchedLists)
+    public UpdateParameters(CFMetaData metadata, QueryOptions options, long timestamp, int ttl, Map<ByteBuffer, CQL3Row> prefetchedLists)
     {
         this.metadata = metadata;
-        this.variables = variables;
+        this.options = options;
         this.timestamp = timestamp;
         this.ttl = ttl;
         this.localDeletionTime = (int)(System.currentTimeMillis() / 1000);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/UserTypes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UserTypes.java b/src/java/org/apache/cassandra/cql3/UserTypes.java
index 2fd1a0f..2faa960 100644
--- a/src/java/org/apache/cassandra/cql3/UserTypes.java
+++ b/src/java/org/apache/cassandra/cql3/UserTypes.java
@@ -65,7 +65,7 @@ public abstract class UserTypes
                 values.add(value);
             }
             DelayedValue value = new DelayedValue(((UserType)receiver.type), values);
-            return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+            return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
         }
 
         private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -144,12 +144,16 @@ public abstract class UserTypes
                 values.get(i).collectMarkerSpecification(boundNames);
         }
 
-        private ByteBuffer[] bindInternal(List<ByteBuffer> variables) throws InvalidRequestException
+        private ByteBuffer[] bindInternal(QueryOptions options) throws InvalidRequestException
         {
+            // Inside UDT values, we must force the serialization of collections whatever the protocol version is in
+            // use since we're going to store directly that serialized value.
+            options = options.withProtocolVersion(3);
+
             ByteBuffer[] buffers = new ByteBuffer[values.size()];
             for (int i = 0; i < type.types.size(); i++)
             {
-                ByteBuffer buffer = values.get(i).bindAndGet(variables);
+                ByteBuffer buffer = values.get(i).bindAndGet(options);
                 if (buffer == null)
                     throw new InvalidRequestException("null is not supported inside user type literals");
                 if (buffer.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -163,15 +167,15 @@ public abstract class UserTypes
             return buffers;
         }
 
-        public Constants.Value bind(List<ByteBuffer> variables) throws InvalidRequestException
+        public Constants.Value bind(QueryOptions options) throws InvalidRequestException
         {
-            return new Constants.Value(bindAndGet(variables));
+            return new Constants.Value(bindAndGet(options));
         }
 
         @Override
-        public ByteBuffer bindAndGet(List<ByteBuffer> variables) throws InvalidRequestException
+        public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
         {
-            return CompositeType.build(bindInternal(variables));
+            return CompositeType.build(bindInternal(options));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index 083543a..f99a2e4 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -46,19 +46,19 @@ public class FunctionCall extends Term.NonTerminal
             t.collectMarkerSpecification(boundNames);
     }
 
-    public Term.Terminal bind(List<ByteBuffer> values) throws InvalidRequestException
+    public Term.Terminal bind(QueryOptions options) throws InvalidRequestException
     {
-        return makeTerminal(fun, bindAndGet(values));
+        return makeTerminal(fun, bindAndGet(options), options.getProtocolVersion());
     }
 
-    public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+    public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
     {
         List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(terms.size());
         for (Term t : terms)
         {
             // For now, we don't allow nulls as argument as no existing function needs it and it
             // simplify things.
-            ByteBuffer val = t.bindAndGet(values);
+            ByteBuffer val = t.bindAndGet(options);
             if (val == null)
                 throw new InvalidRequestException(String.format("Invalid null value for argument to %s", fun));
             buffers.add(val);
@@ -77,16 +77,16 @@ public class FunctionCall extends Term.NonTerminal
         return false;
     }
 
-    private static Term.Terminal makeTerminal(Function fun, ByteBuffer result) throws InvalidRequestException
+    private static Term.Terminal makeTerminal(Function fun, ByteBuffer result, int version) throws InvalidRequestException
     {
         if (!(fun.returnType() instanceof CollectionType))
             return new Constants.Value(result);
 
         switch (((CollectionType)fun.returnType()).kind)
         {
-            case LIST: return Lists.Value.fromSerialized(result, (ListType)fun.returnType());
-            case SET:  return Sets.Value.fromSerialized(result, (SetType)fun.returnType());
-            case MAP:  return Maps.Value.fromSerialized(result, (MapType)fun.returnType());
+            case LIST: return Lists.Value.fromSerialized(result, (ListType)fun.returnType(), version);
+            case SET:  return Sets.Value.fromSerialized(result, (SetType)fun.returnType(), version);
+            case MAP:  return Maps.Value.fromSerialized(result, (MapType)fun.returnType(), version);
         }
         throw new AssertionError();
     }
@@ -119,7 +119,7 @@ public class FunctionCall extends Term.NonTerminal
             // If all parameters are terminal and the function is pure, we can
             // evaluate it now, otherwise we'd have to wait execution time
             return allTerminal && fun.isPure()
-                ? makeTerminal(fun, execute(fun, parameters))
+                ? makeTerminal(fun, execute(fun, parameters), QueryOptions.DEFAULT.getProtocolVersion())
                 : new FunctionCall(fun, parameters);
         }
 
@@ -130,7 +130,7 @@ public class FunctionCall extends Term.NonTerminal
             for (Term t : parameters)
             {
                 assert t instanceof Term.Terminal;
-                buffers.add(((Term.Terminal)t).get());
+                buffers.add(((Term.Terminal)t).get(QueryOptions.DEFAULT));
             }
             return fun.execute(buffers);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 88bb644..95d504d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -116,16 +116,16 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         return statements;
     }
 
-    private Collection<? extends IMutation> getMutations(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
+    private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now)
     throws RequestExecutionException, RequestValidationException
     {
         Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
-            List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
-            long timestamp = attrs.getTimestamp(now, statementVariables);
-            addStatementMutations(statement, statementVariables, local, cl, timestamp, mutations);
+            QueryOptions statementOptions = options.forStatement(i);
+            long timestamp = attrs.getTimestamp(now, statementOptions);
+            addStatementMutations(statement, statementOptions, local, timestamp, mutations);
         }
         return unzipMutations(mutations);
     }
@@ -143,9 +143,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     }
 
     private void addStatementMutations(ModificationStatement statement,
-                                       List<ByteBuffer> variables,
+                                       QueryOptions options,
                                        boolean local,
-                                       ConsistencyLevel cl,
                                        long now,
                                        Map<String, Map<ByteBuffer, IMutation>> mutations)
     throws RequestExecutionException, RequestValidationException
@@ -161,9 +160,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         // The following does the same than statement.getMutations(), but we inline it here because
         // we don't want to recreate mutations every time as this is particularly inefficient when applying
         // multiple batch to the same partition (see #6737).
-        List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables);
-        Composite clusteringPrefix = statement.createClusteringPrefix(variables);
-        UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+        List<ByteBuffer> keys = statement.buildPartitionKeyNames(options);
+        Composite clusteringPrefix = statement.createClusteringPrefix(options);
+        UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, options, local, now);
 
         for (ByteBuffer key : keys)
         {
@@ -172,7 +171,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             if (mutation == null)
             {
                 mut = new Mutation(ksName, key);
-                mutation = type == Type.COUNTER ? new CounterMutation(mut, cl) : mut;
+                mutation = type == Type.COUNTER ? new CounterMutation(mut, options.getConsistency()) : mut;
                 ksMap.put(key, mutation);
             }
             else
@@ -209,29 +208,26 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
 
     public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
-        if (options.getConsistency() == null)
-            throw new InvalidRequestException("Invalid empty consistency level");
-
-        return execute(new PreparedBatchVariables(options.getValues()), false, options.getConsistency(), options.getSerialConsistency(), queryState.getTimestamp());
+        return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options));
     }
 
-    public ResultMessage executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
+    public ResultMessage execute(QueryState queryState, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException
     {
-        if (cl == null)
-            throw new InvalidRequestException("Invalid empty consistency level");
-
-        return execute(new BatchOfPreparedVariables(variables), false, cl, ConsistencyLevel.SERIAL, queryState.getTimestamp());
+        return execute(options, false, options.getTimestamp(queryState));
     }
 
-    public ResultMessage execute(BatchVariables variables, boolean local, ConsistencyLevel cl, ConsistencyLevel serialCl, long now)
+    public ResultMessage execute(BatchQueryOptions options, boolean local, long now)
     throws RequestExecutionException, RequestValidationException
     {
-        // TODO: we don't support a serial consistency for batches in the protocol so defaulting to SERIAL for now.
-        // We'll need to fix that.
+        if (options.getConsistency() == null)
+            throw new InvalidRequestException("Invalid empty consistency level");
+        if (options.getSerialConsistency() == null)
+            throw new InvalidRequestException("Invalid empty serial consistency level");
+
         if (hasConditions)
-            return executeWithConditions(variables, cl, serialCl, now);
+            return executeWithConditions(options, now);
 
-        executeWithoutConditions(getMutations(variables, local, cl, now), cl);
+        executeWithoutConditions(getMutations(options, local, now), options.getConsistency());
         return null;
     }
 
@@ -251,8 +247,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
     }
 
-
-    private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
+    private ResultMessage executeWithConditions(BatchQueryOptions options, long now)
     throws RequestExecutionException, RequestValidationException
     {
         ByteBuffer key = null;
@@ -265,9 +260,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         for (int i = 0; i < statements.size(); i++)
         {
             ModificationStatement statement = statements.get(i);
-            List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
-            long timestamp = attrs.getTimestamp(now, statementVariables);
-            List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementVariables);
+            QueryOptions statementOptions = options.forStatement(i);
+            long timestamp = attrs.getTimestamp(now, statementOptions);
+            List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementOptions);
             if (pks.size() > 1)
                 throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)");
             if (key == null)
@@ -283,10 +278,10 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
                 throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
             }
 
-            Composite clusteringPrefix = statement.createClusteringPrefix(statementVariables);
+            Composite clusteringPrefix = statement.createClusteringPrefix(statementOptions);
             if (statement.hasConditions())
             {
-                statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
+                statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementOptions, timestamp);
                 // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
                 if (statement.hasIfNotExistCondition() || statement.hasIfExistCondition())
                     columnsWithConditions = null;
@@ -295,20 +290,20 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             }
             else
             {
-                UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementVariables, false, cl, now);
+                UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementOptions, false, now);
                 statement.addUpdateForKey(updates, key, clusteringPrefix, params);
             }
         }
 
         verifyBatchSize(Collections.singleton(updates));
-        ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
+        ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, options.getSerialConsistency(), options.getConsistency());
         return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
     }
 
     public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
     {
         assert !hasConditions;
-        for (IMutation mutation : getMutations(PreparedBatchVariables.EMPTY, true, null, queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(BatchQueryOptions.DEFAULT, true, queryState.getTimestamp()))
         {
             // We don't use counters internally.
             assert mutation instanceof Mutation;
@@ -322,38 +317,6 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         public List<ByteBuffer> getVariablesForStatement(int statementInBatch);
     }
 
-    public static class PreparedBatchVariables implements BatchVariables
-    {
-        public static final BatchVariables EMPTY = new PreparedBatchVariables(Collections.<ByteBuffer>emptyList());
-
-        private final List<ByteBuffer> variables;
-
-        public PreparedBatchVariables(List<ByteBuffer> variables)
-        {
-            this.variables = variables;
-        }
-
-        public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
-        {
-            return variables;
-        }
-    }
-
-    public static class BatchOfPreparedVariables implements BatchVariables
-    {
-        private final List<List<ByteBuffer>> variables;
-
-        public BatchOfPreparedVariables(List<List<ByteBuffer>> variables)
-        {
-            this.variables = variables;
-        }
-
-        public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
-        {
-            return variables.get(statementInBatch);
-        }
-    }
-
     public String toString()
     {
         return String.format("BatchStatement(type=%s, statements=%s)", type, statements);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
index 4003edc..5005d2f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
@@ -71,7 +71,7 @@ public class CQL3CasConditions implements CASConditions
             throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT EXISTS conditions for the same row");
     }
 
-    public void addConditions(Composite prefix, Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+    public void addConditions(Composite prefix, Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException
     {
         RowCondition condition = conditions.get(prefix);
         if (condition == null)
@@ -83,7 +83,7 @@ public class CQL3CasConditions implements CASConditions
         {
             throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row");
         }
-        ((ColumnsConditions)condition).addConditions(conds, variables);
+        ((ColumnsConditions)condition).addConditions(conds, options);
     }
 
     public IDiskAtomFilter readFilter()
@@ -167,21 +167,21 @@ public class CQL3CasConditions implements CASConditions
 
     private static class ColumnsConditions extends RowCondition
     {
-        private final Map<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.WithVariables> conditions = new HashMap<>();
+        private final Map<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.WithOptions> conditions = new HashMap<>();
 
         private ColumnsConditions(Composite rowPrefix, long now)
         {
             super(rowPrefix, now);
         }
 
-        public void addConditions(Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+        public void addConditions(Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException
         {
             for (ColumnCondition condition : conds)
             {
                 // We will need the variables in appliesTo but with protocol batches, each condition in this object can have a
                 // different list of variables.
-                ColumnCondition.WithVariables current = condition.with(variables);
-                ColumnCondition.WithVariables previous = conditions.put(Pair.create(condition.column.name, current.getCollectionElementValue()), current);
+                ColumnCondition.WithOptions current = condition.with(options);
+                ColumnCondition.WithOptions previous = conditions.put(Pair.create(condition.column.name, current.getCollectionElementValue()), current);
                 // If 2 conditions are actually equal, let it slide
                 if (previous != null && !previous.equalsTo(current))
                     throw new InvalidRequestException("Duplicate and incompatible conditions for column " + condition.column.name);
@@ -193,7 +193,7 @@ public class CQL3CasConditions implements CASConditions
             if (current == null)
                 return conditions.isEmpty();
 
-            for (ColumnCondition.WithVariables condition : conditions.values())
+            for (ColumnCondition.WithOptions condition : conditions.values())
                 if (!condition.appliesTo(rowPrefix, current, now))
                     return false;
             return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 4741b9a..7f8b678 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -119,9 +119,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return cfm.isCounter();
     }
 
-    public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
+    public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
     {
-        return attrs.getTimestamp(now, variables);
+        return attrs.getTimestamp(now, options);
     }
 
     public boolean isTimestampSet()
@@ -129,9 +129,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return attrs.isTimestampSet();
     }
 
-    public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
+    public int getTimeToLive(QueryOptions options) throws InvalidRequestException
     {
-        return attrs.getTimeToLive(variables);
+        return attrs.getTimeToLive(options);
     }
 
     public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
@@ -284,7 +284,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         }
     }
 
-    public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
+    public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options)
     throws InvalidRequestException
     {
         CBuilder keyBuilder = cfm.getKeyValidatorAsCType().builder();
@@ -295,7 +295,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             if (r == null)
                 throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
 
-            List<ByteBuffer> values = r.values(variables);
+            List<ByteBuffer> values = r.values(options);
 
             if (keyBuilder.remainingCount() == 1)
             {
@@ -321,7 +321,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return keys;
     }
 
-    public Composite createClusteringPrefix(List<ByteBuffer> variables)
+    public Composite createClusteringPrefix(QueryOptions options)
     throws InvalidRequestException
     {
         // If the only updated/deleted columns are static, then we don't need clustering columns.
@@ -353,10 +353,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             }
         }
 
-        return createClusteringPrefixBuilderInternal(variables);
+        return createClusteringPrefixBuilderInternal(options);
     }
 
-    private Composite createClusteringPrefixBuilderInternal(List<ByteBuffer> variables)
+    private Composite createClusteringPrefixBuilderInternal(QueryOptions options)
     throws InvalidRequestException
     {
         CBuilder builder = cfm.comparator.prefixBuilder();
@@ -376,7 +376,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             }
             else
             {
-                List<ByteBuffer> values = r.values(variables);
+                List<ByteBuffer> values = r.values(options);
                 assert values.size() == 1; // We only allow IN for row keys so far
                 ByteBuffer val = values.get(0);
                 if (val == null)
@@ -488,7 +488,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         else
             cl.validateForWrite(cfm.ksName);
 
-        Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp());
+        Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState));
         if (!mutations.isEmpty())
             StorageProxy.mutateWithTriggers(mutations, cl, false);
 
@@ -498,18 +498,18 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options)
     throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> variables = options.getValues();
-        List<ByteBuffer> keys = buildPartitionKeyNames(variables);
+        List<ByteBuffer> keys = buildPartitionKeyNames(options);
         // We don't support IN for CAS operation so far
         if (keys.size() > 1)
             throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
 
         ByteBuffer key = keys.get(0);
 
-        CQL3CasConditions conditions = new CQL3CasConditions(cfm, queryState.getTimestamp());
-        Composite prefix = createClusteringPrefix(variables);
+        long now = options.getTimestamp(queryState);
+        CQL3CasConditions conditions = new CQL3CasConditions(cfm, now);
+        Composite prefix = createClusteringPrefix(options);
         ColumnFamily updates = ArrayBackedSortedColumns.factory.create(cfm);
-        addUpdatesAndConditions(key, prefix, updates, conditions, variables, getTimestamp(queryState.getTimestamp(), variables));
+        addUpdatesAndConditions(key, prefix, updates, conditions, options, getTimestamp(now, options));
 
         ColumnFamily result = StorageProxy.cas(keyspace(),
                                                columnFamily(),
@@ -521,10 +521,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         return new ResultMessage.Rows(buildCasResultSet(key, result));
     }
 
-    public void addUpdatesAndConditions(ByteBuffer key, Composite clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, List<ByteBuffer> variables, long now)
+    public void addUpdatesAndConditions(ByteBuffer key, Composite clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, QueryOptions options, long now)
     throws InvalidRequestException
     {
-        UpdateParameters updParams = new UpdateParameters(cfm, variables, now, getTimeToLive(variables), null);
+        UpdateParameters updParams = new UpdateParameters(cfm, options, now, getTimeToLive(options), null);
         addUpdateForKey(updates, key, clusteringPrefix, updParams);
 
         if (ifNotExists)
@@ -541,9 +541,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         else
         {
             if (columnConditions != null)
-                conditions.addConditions(clusteringPrefix, columnConditions, variables);
+                conditions.addConditions(clusteringPrefix, columnConditions, options);
             if (staticConditions != null)
-                conditions.addConditions(cfm.comparator.staticPrefix(), staticConditions, variables);
+                conditions.addConditions(cfm.comparator.staticPrefix(), staticConditions, options);
         }
     }
 
@@ -614,7 +614,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
 
         long now = System.currentTimeMillis();
         Selection.ResultSetBuilder builder = selection.resultSetBuilder(now);
-        SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, Collections.<ByteBuffer>emptyList(), now, builder);
+        SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, QueryOptions.DEFAULT, now, builder);
 
         return builder.build();
     }
@@ -624,7 +624,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         if (hasConditions())
             throw new UnsupportedOperationException();
 
-        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(QueryOptions.DEFAULT, true, queryState.getTimestamp()))
         {
             // We don't use counters internally.
             assert mutation instanceof Mutation;
@@ -636,7 +636,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     /**
      * Convert statement into a list of mutations to apply on the server
      *
-     * @param variables value for prepared statement markers
+     * @param options value for prepared statement markers
      * @param local if true, any requests (for collections) performed by getMutation should be done locally only.
      * @param cl the consistency to use for the potential reads involved in generating the mutations (for lists set/delete operations)
      * @param now the current timestamp in microseconds to use if no timestamp is user provided.
@@ -644,13 +644,13 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+    private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now)
     throws RequestExecutionException, RequestValidationException
     {
-        List<ByteBuffer> keys = buildPartitionKeyNames(variables);
-        Composite clusteringPrefix = createClusteringPrefix(variables);
+        List<ByteBuffer> keys = buildPartitionKeyNames(options);
+        Composite clusteringPrefix = createClusteringPrefix(options);
 
-        UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+        UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, options, local, now);
 
         Collection<IMutation> mutations = new ArrayList<IMutation>();
         for (ByteBuffer key: keys)
@@ -659,22 +659,21 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
             addUpdateForKey(cf, key, clusteringPrefix, params);
             Mutation mut = new Mutation(cfm.ksName, key, cf);
-            mutations.add(isCounter() ? new CounterMutation(mut, cl) : mut);
+            mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut);
         }
         return mutations;
     }
 
     public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
                                                  Composite prefix,
-                                                 List<ByteBuffer> variables,
+                                                 QueryOptions options,
                                                  boolean local,
-                                                 ConsistencyLevel cl,
                                                  long now)
     throws RequestExecutionException, RequestValidationException
     {
         // Some lists operation requires reading
-        Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, cl);
-        return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+        Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, options.getConsistency());
+        return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), rows);
     }
 
     public static abstract class Parsed extends CFStatement

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Restriction.java b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
index 6b7eca7..4fd02c1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
@@ -42,7 +42,7 @@ public interface Restriction
     public boolean isContains();
 
     // Not supported by Slice, but it's convenient to have here
-    public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException;
+    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException;
 
     public static class EQ implements Restriction
     {
@@ -55,9 +55,9 @@ public interface Restriction
             this.onToken = onToken;
         }
 
-        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
         {
-            return Collections.singletonList(value.bindAndGet(variables));
+            return Collections.singletonList(value.bindAndGet(options));
         }
 
         public boolean isSlice()
@@ -145,11 +145,11 @@ public interface Restriction
                 this.values = values;
             }
 
-            public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+            public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
             {
                 List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size());
                 for (Term value : values)
-                    buffers.add(value.bindAndGet(variables));
+                    buffers.add(value.bindAndGet(options));
                 return buffers;
             }
 
@@ -174,9 +174,9 @@ public interface Restriction
                 this.marker = marker;
             }
 
-            public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+            public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
             {
-                Lists.Value lval = marker.bind(variables);
+                Lists.Value lval = marker.bind(options);
                 if (lval == null)
                     throw new InvalidRequestException("Invalid null value for IN restriction");
                 return lval.elements;
@@ -234,7 +234,7 @@ public interface Restriction
             return false;
         }
 
-        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
         {
             throw new UnsupportedOperationException();
         }
@@ -249,9 +249,9 @@ public interface Restriction
             return bounds[b.idx] != null;
         }
 
-        public ByteBuffer bound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+        public ByteBuffer bound(Bound b, QueryOptions options) throws InvalidRequestException
         {
-            return bounds[b.idx].bindAndGet(variables);
+            return bounds[b.idx].bindAndGet(options);
         }
 
         public boolean isInclusive(Bound b)
@@ -379,25 +379,25 @@ public interface Restriction
             keys.add(t);
         }
 
-        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
         {
             if (values == null)
                 return Collections.emptyList();
 
             List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size());
             for (Term value : values)
-                buffers.add(value.bindAndGet(variables));
+                buffers.add(value.bindAndGet(options));
             return buffers;
         }
 
-        public List<ByteBuffer> keys(List<ByteBuffer> variables) throws InvalidRequestException
+        public List<ByteBuffer> keys(QueryOptions options) throws InvalidRequestException
         {
             if (keys == null)
                 return Collections.emptyList();
 
             List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(keys.size());
             for (Term value : keys)
-                buffers.add(value.bindAndGet(variables));
+                buffers.add(value.bindAndGet(options));
             return buffers;
         }