You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/30 20:22:42 UTC
[3/4] Native protocol v3
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 76b1eeb..12accaf 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.cql3;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@@ -25,120 +26,244 @@ import java.util.List;
import io.netty.buffer.ByteBuf;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.transport.CBCodec;
import org.apache.cassandra.transport.CBUtil;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.utils.Pair;
/**
* Options for a query.
*/
-public class QueryOptions
+public abstract class QueryOptions
{
- public static final QueryOptions DEFAULT = new QueryOptions(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList());
+ public static final QueryOptions DEFAULT = new DefaultQueryOptions(ConsistencyLevel.ONE,
+ Collections.<ByteBuffer>emptyList(),
+ false,
+ SpecificOptions.DEFAULT,
+ 3);
public static final CBCodec<QueryOptions> codec = new Codec();
- private final ConsistencyLevel consistency;
- private final List<ByteBuffer> values;
- private final boolean skipMetadata;
-
- private final SpecificOptions options;
+ public static QueryOptions fromProtocolV1(ConsistencyLevel consistency, List<ByteBuffer> values)
+ {
+ return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 1);
+ }
- // The protocol version of incoming queries. This is set during deserializaion and will be 0
- // if the QueryOptions does not come from a user message (or come from thrift).
- private final transient int protocolVersion;
+ public static QueryOptions fromProtocolV2(ConsistencyLevel consistency, List<ByteBuffer> values)
+ {
+ return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 2);
+ }
- public QueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values)
+ public static QueryOptions forInternalCalls(ConsistencyLevel consistency, List<ByteBuffer> values)
{
- this(consistency, values, false, SpecificOptions.DEFAULT, 0);
+ return new DefaultQueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 0);
}
- public QueryOptions(ConsistencyLevel consistency,
- List<ByteBuffer> values,
- boolean skipMetadata,
- int pageSize,
- PagingState pagingState,
- ConsistencyLevel serialConsistency)
+ public static QueryOptions fromPreV3Batch(ConsistencyLevel consistency)
{
- this(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency), 0);
+ return new DefaultQueryOptions(consistency, Collections.<ByteBuffer>emptyList(), false, SpecificOptions.DEFAULT, 2);
}
- private QueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, int protocolVersion)
+ public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency)
{
- this.consistency = consistency;
- this.values = values;
- this.skipMetadata = skipMetadata;
- this.options = options;
- this.protocolVersion = protocolVersion;
+ return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L), 0);
}
- public static QueryOptions fromProtocolV1(ConsistencyLevel consistency, List<ByteBuffer> values)
+ public abstract ConsistencyLevel getConsistency();
+ public abstract List<ByteBuffer> getValues();
+ public abstract boolean skipMetadata();
+
+ /** The pageSize for this query. Will be <= 0 if not relevant for the query. */
+ public int getPageSize()
{
- return new QueryOptions(consistency, values, false, SpecificOptions.DEFAULT, 1);
+ return getSpecificOptions().pageSize;
}
- public ConsistencyLevel getConsistency()
+ /** The paging state for this query, or null if not relevant. */
+ public PagingState getPagingState()
{
- return consistency;
+ return getSpecificOptions().state;
}
- public List<ByteBuffer> getValues()
+ /** Serial consistency for conditional updates. */
+ public ConsistencyLevel getSerialConsistency()
{
- return values;
+ return getSpecificOptions().serialConsistency;
}
- public boolean skipMetadata()
+ public long getTimestamp(QueryState state)
{
- return skipMetadata;
+ long tstamp = getSpecificOptions().timestamp;
+ return tstamp >= 0 ? tstamp : state.getTimestamp();
}
/**
- * The pageSize for this query. Will be <= 0 if not relevant for the query.
+ * The protocol version for the query. Will be 3 if the object don't come from
+ * a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
*/
- public int getPageSize()
+ public abstract int getProtocolVersion();
+
+ public abstract QueryOptions withProtocolVersion(int version);
+
+ // Mainly for the sake of BatchQueryOptions
+ abstract SpecificOptions getSpecificOptions();
+
+ public QueryOptions prepare(List<ColumnSpecification> specs)
{
- return options.pageSize;
+ return this;
}
- /**
- * The paging state for this query, or null if not relevant.
- */
- public PagingState getPagingState()
+ static class DefaultQueryOptions extends QueryOptions
{
- return options.state;
+ private final ConsistencyLevel consistency;
+ private final List<ByteBuffer> values;
+ private final boolean skipMetadata;
+
+ private final SpecificOptions options;
+
+ // The protocol version of incoming queries. This is set during deserializaion and will be 0
+ // if the QueryOptions does not come from a user message (or come from thrift).
+ private final transient int protocolVersion;
+
+ DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, int protocolVersion)
+ {
+ this.consistency = consistency;
+ this.values = values;
+ this.skipMetadata = skipMetadata;
+ this.options = options;
+ this.protocolVersion = protocolVersion;
+ }
+
+ public QueryOptions withProtocolVersion(int version)
+ {
+ return new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
+ }
+
+ public ConsistencyLevel getConsistency()
+ {
+ return consistency;
+ }
+
+ public List<ByteBuffer> getValues()
+ {
+ return values;
+ }
+
+ public boolean skipMetadata()
+ {
+ return skipMetadata;
+ }
+
+ public int getProtocolVersion()
+ {
+ return protocolVersion;
+ }
+
+ SpecificOptions getSpecificOptions()
+ {
+ return options;
+ }
}
- /**
- * Serial consistency for conditional updates.
- */
- public ConsistencyLevel getSerialConsistency()
+ static abstract class QueryOptionsWrapper extends QueryOptions
{
- return options.serialConsistency;
+ protected final QueryOptions wrapped;
+
+ QueryOptionsWrapper(QueryOptions wrapped)
+ {
+ this.wrapped = wrapped;
+ }
+
+ public ConsistencyLevel getConsistency()
+ {
+ return wrapped.getConsistency();
+ }
+
+ public boolean skipMetadata()
+ {
+ return wrapped.skipMetadata();
+ }
+
+ public int getProtocolVersion()
+ {
+ return wrapped.getProtocolVersion();
+ }
+
+ SpecificOptions getSpecificOptions()
+ {
+ return wrapped.getSpecificOptions();
+ }
+
+ @Override
+ public QueryOptions prepare(List<ColumnSpecification> specs)
+ {
+ wrapped.prepare(specs);
+ return this;
+ }
+
+ public QueryOptions withProtocolVersion(int version)
+ {
+ return new DefaultQueryOptions(getConsistency(), getValues(), skipMetadata(), getSpecificOptions(), version);
+ }
}
- /**
- * The protocol version for the query. Will be 0 if the object don't come from
- * a native protocol request (i.e. it's been allocated locally or by CQL-over-thrift).
- */
- public int getProtocolVersion()
+ static class OptionsWithNames extends QueryOptionsWrapper
{
- return protocolVersion;
+ private final List<String> names;
+ private List<ByteBuffer> orderedValues;
+
+ OptionsWithNames(DefaultQueryOptions wrapped, List<String> names)
+ {
+ super(wrapped);
+ this.names = names;
+ }
+
+ @Override
+ public QueryOptions prepare(List<ColumnSpecification> specs)
+ {
+ super.prepare(specs);
+
+ orderedValues = new ArrayList<ByteBuffer>(specs.size());
+ for (int i = 0; i < specs.size(); i++)
+ {
+ String name = specs.get(i).name.toString();
+ for (int j = 0; j < names.size(); j++)
+ {
+ if (name.equals(names.get(j)))
+ {
+ orderedValues.add(wrapped.getValues().get(j));
+ break;
+ }
+ }
+ }
+ return this;
+ }
+
+ public List<ByteBuffer> getValues()
+ {
+ assert orderedValues != null; // We should have called prepare first!
+ return orderedValues;
+ }
}
// Options that are likely to not be present in most queries
- private static class SpecificOptions
+ static class SpecificOptions
{
- private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null);
+ private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, -1L);
private final int pageSize;
private final PagingState state;
private final ConsistencyLevel serialConsistency;
+ private final long timestamp;
- private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency)
+ private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp)
{
this.pageSize = pageSize;
this.state = state;
this.serialConsistency = serialConsistency == null ? ConsistencyLevel.SERIAL : serialConsistency;
+ this.timestamp = timestamp;
}
}
@@ -151,7 +276,9 @@ public class QueryOptions
SKIP_METADATA,
PAGE_SIZE,
PAGING_STATE,
- SERIAL_CONSISTENCY;
+ SERIAL_CONSISTENCY,
+ TIMESTAMP,
+ NAMES_FOR_VALUES;
public static EnumSet<Flag> deserialize(int flags)
{
@@ -181,9 +308,21 @@ public class QueryOptions
ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
EnumSet<Flag> flags = Flag.deserialize((int)body.readByte());
- List<ByteBuffer> values = flags.contains(Flag.VALUES)
- ? CBUtil.readValueList(body)
- : Collections.<ByteBuffer>emptyList();
+ List<ByteBuffer> values = Collections.<ByteBuffer>emptyList();
+ List<String> names = null;
+ if (flags.contains(Flag.VALUES))
+ {
+ if (flags.contains(Flag.NAMES_FOR_VALUES))
+ {
+ Pair<List<String>, List<ByteBuffer>> namesAndValues = CBUtil.readNameAndValueList(body);
+ names = namesAndValues.left;
+ values = namesAndValues.right;
+ }
+ else
+ {
+ values = CBUtil.readValueList(body);
+ }
+ }
boolean skipMetadata = flags.contains(Flag.SKIP_METADATA);
flags.remove(Flag.VALUES);
@@ -195,9 +334,19 @@ public class QueryOptions
int pageSize = flags.contains(Flag.PAGE_SIZE) ? body.readInt() : -1;
PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValue(body)) : null;
ConsistencyLevel serialConsistency = flags.contains(Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : ConsistencyLevel.SERIAL;
- options = new SpecificOptions(pageSize, pagingState, serialConsistency);
+ long timestamp = -1L;
+ if (flags.contains(Flag.TIMESTAMP))
+ {
+ long ts = body.readLong();
+ if (ts < 0)
+ throw new ProtocolException("Invalid negative (" + ts + ") protocol level timestamp");
+ timestamp = ts;
+ }
+
+ options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp);
}
- return new QueryOptions(consistency, values, skipMetadata, options, version);
+ DefaultQueryOptions opts = new DefaultQueryOptions(consistency, values, skipMetadata, options, version);
+ return names == null ? opts : new OptionsWithNames(opts, names);
}
public void encode(QueryOptions options, ByteBuf dest, int version)
@@ -217,6 +366,12 @@ public class QueryOptions
CBUtil.writeValue(options.getPagingState().serialize(), dest);
if (flags.contains(Flag.SERIAL_CONSISTENCY))
CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest);
+ if (flags.contains(Flag.TIMESTAMP))
+ dest.writeLong(options.getSpecificOptions().timestamp);
+
+ // Note that we don't really have to bother with NAMES_FOR_VALUES server side,
+ // and in fact we never really encode QueryOptions, only decode them, so we
+ // don't bother.
}
public int encodedSize(QueryOptions options, int version)
@@ -236,6 +391,8 @@ public class QueryOptions
size += CBUtil.sizeOfValue(options.getPagingState().serialize());
if (flags.contains(Flag.SERIAL_CONSISTENCY))
size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
+ if (flags.contains(Flag.TIMESTAMP))
+ size += 8;
return size;
}
@@ -245,7 +402,7 @@ public class QueryOptions
EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
if (options.getValues().size() > 0)
flags.add(Flag.VALUES);
- if (options.skipMetadata)
+ if (options.skipMetadata())
flags.add(Flag.SKIP_METADATA);
if (options.getPageSize() >= 0)
flags.add(Flag.PAGE_SIZE);
@@ -253,6 +410,8 @@ public class QueryOptions
flags.add(Flag.PAGING_STATE);
if (options.getSerialConsistency() != ConsistencyLevel.SERIAL)
flags.add(Flag.SERIAL_CONSISTENCY);
+ if (options.getSpecificOptions().timestamp >= 0)
+ flags.add(Flag.TIMESTAMP);
return flags;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index e8cee15..40c45af 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -53,12 +53,12 @@ public class QueryProcessor implements QueryHandler
private static final long MAX_CACHE_PREPARED_MEMORY = Runtime.getRuntime().maxMemory() / 256;
private static final int MAX_CACHE_PREPARED_COUNT = 10000;
- private static EntryWeigher<MD5Digest, CQLStatement> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, CQLStatement>()
+ private static EntryWeigher<MD5Digest, ParsedStatement.Prepared> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, ParsedStatement.Prepared>()
{
@Override
- public int weightOf(MD5Digest key, CQLStatement value)
+ public int weightOf(MD5Digest key, ParsedStatement.Prepared value)
{
- return Ints.checkedCast(measure(key) + measure(value));
+ return Ints.checkedCast(measure(key) + measure(value.statement) + measure(value.boundNames));
}
};
@@ -71,12 +71,12 @@ public class QueryProcessor implements QueryHandler
}
};
- private static final ConcurrentLinkedHashMap<MD5Digest, CQLStatement> preparedStatements;
+ private static final ConcurrentLinkedHashMap<MD5Digest, ParsedStatement.Prepared> preparedStatements;
private static final ConcurrentLinkedHashMap<Integer, CQLStatement> thriftPreparedStatements;
static
{
- preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, CQLStatement>()
+ preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, ParsedStatement.Prepared>()
.maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY)
.weigher(cqlMemoryUsageWeigher)
.build();
@@ -90,7 +90,7 @@ public class QueryProcessor implements QueryHandler
{
}
- public CQLStatement getPrepared(MD5Digest id)
+ public ParsedStatement.Prepared getPrepared(MD5Digest id)
{
return preparedStatements.get(id);
}
@@ -154,29 +154,31 @@ public class QueryProcessor implements QueryHandler
public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
throws RequestExecutionException, RequestValidationException
{
- return instance.process(queryString, queryState, new QueryOptions(cl, Collections.<ByteBuffer>emptyList()));
+ return instance.process(queryString, queryState, QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()));
}
public ResultMessage process(String queryString, QueryState queryState, QueryOptions options)
throws RequestExecutionException, RequestValidationException
{
- CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
+ ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());
+ options.prepare(p.boundNames);
+ CQLStatement prepared = p.statement;
if (prepared.getBoundTerms() != options.getValues().size())
throw new InvalidRequestException("Invalid amount of bind variables");
return processStatement(prepared, queryState, options);
}
- public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
+ public static ParsedStatement.Prepared parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
{
- return getStatement(queryStr, queryState.getClientState()).statement;
+ return getStatement(queryStr, queryState.getClientState());
}
public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException
{
try
{
- ResultMessage result = instance.process(query, QueryState.forInternalCalls(), new QueryOptions(cl, Collections.<ByteBuffer>emptyList()));
+ ResultMessage result = instance.process(query, QueryState.forInternalCalls(), QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()));
if (result instanceof ResultMessage.Rows)
return UntypedResultSet.create(((ResultMessage.Rows)result).result);
else
@@ -276,7 +278,7 @@ public class QueryProcessor implements QueryHandler
else
{
MD5Digest statementId = MD5Digest.compute(toHash);
- preparedStatements.put(statementId, prepared.statement);
+ preparedStatements.put(statementId, prepared);
logger.trace(String.format("Stored prepared statement %s with %d bind markers",
statementId,
prepared.statement.getBoundTerms()));
@@ -312,8 +314,7 @@ public class QueryProcessor implements QueryHandler
ClientState clientState = queryState.getClientState();
batch.checkAccess(clientState);
batch.validate(clientState);
-
- batch.executeWithPerStatementVariables(options.getConsistency(), queryState, options.getValues());
+ batch.execute(queryState, options);
return new ResultMessage.Void();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 53ba380..eea0475 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -373,7 +373,7 @@ public class ResultSet
String ksName = globalTablesSpec ? globalKsName : CBUtil.readString(body);
String cfName = globalTablesSpec ? globalCfName : CBUtil.readString(body);
ColumnIdentifier colName = new ColumnIdentifier(CBUtil.readString(body), true);
- AbstractType type = DataType.toType(DataType.codec.decodeOne(body));
+ AbstractType type = DataType.toType(DataType.codec.decodeOne(body, version));
names.add(new ColumnSpecification(ksName, cfName, colName, type));
}
return new Metadata(flags, names).setHasMorePages(state);
@@ -410,7 +410,7 @@ public class ResultSet
CBUtil.writeString(name.cfName, dest);
}
CBUtil.writeString(name.name.toString(), dest);
- DataType.codec.writeOne(DataType.fromType(name.type), dest);
+ DataType.codec.writeOne(DataType.fromType(name.type, version), dest, version);
}
}
}
@@ -442,7 +442,7 @@ public class ResultSet
size += CBUtil.sizeOfString(name.cfName);
}
size += CBUtil.sizeOfString(name.name.toString());
- size += DataType.codec.oneSerializedSize(DataType.fromType(name.type));
+ size += DataType.codec.oneSerializedSize(DataType.fromType(name.type, version), version);
}
}
return size;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Sets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index e48a3ce..92a3510 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -33,10 +33,10 @@ import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -88,7 +88,7 @@ public abstract class Sets
values.add(t);
}
DelayedValue value = new DelayedValue(((SetType)receiver.type).elements, values);
- return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+ return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
}
private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -140,13 +140,13 @@ public abstract class Sets
this.elements = elements;
}
- public static Value fromSerialized(ByteBuffer value, SetType type) throws InvalidRequestException
+ public static Value fromSerialized(ByteBuffer value, SetType type, int version) throws InvalidRequestException
{
try
{
// Collections have this small hack that validate cannot be called on a serialized object,
// but compose does the validation (so we're fine).
- Set<?> s = (Set<?>)type.compose(value);
+ Set<?> s = (Set<?>)type.getSerializer().deserializeForNativeProtocol(value, version);
Set<ByteBuffer> elements = new LinkedHashSet<ByteBuffer>(s.size());
for (Object element : s)
elements.add(type.elements.decompose(element));
@@ -158,9 +158,9 @@ public abstract class Sets
}
}
- public ByteBuffer get()
+ public ByteBuffer get(QueryOptions options)
{
- return CollectionType.pack(new ArrayList<ByteBuffer>(elements), elements.size());
+ return CollectionSerializer.pack(new ArrayList<ByteBuffer>(elements), elements.size(), options.getProtocolVersion());
}
}
@@ -186,12 +186,12 @@ public abstract class Sets
{
}
- public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+ public Value bind(QueryOptions options) throws InvalidRequestException
{
Set<ByteBuffer> buffers = new TreeSet<ByteBuffer>(comparator);
for (Term t : elements)
{
- ByteBuffer bytes = t.bindAndGet(values);
+ ByteBuffer bytes = t.bindAndGet(options);
if (bytes == null)
throw new InvalidRequestException("null is not supported inside collections");
@@ -216,10 +216,10 @@ public abstract class Sets
assert receiver.type instanceof SetType;
}
- public Value bind(List<ByteBuffer> values) throws InvalidRequestException
+ public Value bind(QueryOptions options) throws InvalidRequestException
{
- ByteBuffer value = values.get(bindIndex);
- return value == null ? null : Value.fromSerialized(value, (SetType)receiver.type);
+ ByteBuffer value = options.getValues().get(bindIndex);
+ return value == null ? null : Value.fromSerialized(value, (SetType)receiver.type, options.getProtocolVersion());
}
}
@@ -253,7 +253,7 @@ public abstract class Sets
static void doAdd(Term t, ColumnFamily cf, Composite prefix, ColumnDefinition column, UpdateParameters params) throws InvalidRequestException
{
- Term.Terminal value = t.bind(params.variables);
+ Term.Terminal value = t.bind(params.options);
if (value == null)
return;
@@ -277,7 +277,7 @@ public abstract class Sets
public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException
{
- Term.Terminal value = t.bind(params.variables);
+ Term.Terminal value = t.bind(params.options);
if (value == null)
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index d539ecf..481514f 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -48,7 +48,7 @@ public interface Term
* @return the result of binding all the variables of this NonTerminal (or
* 'this' if the term is terminal).
*/
- public Terminal bind(List<ByteBuffer> values) throws InvalidRequestException;
+ public Terminal bind(QueryOptions options) throws InvalidRequestException;
/**
* A shorter for bind(values).get().
@@ -56,7 +56,7 @@ public interface Term
* object between the bind and the get (note that we still want to be able
* to separate bind and get for collections).
*/
- public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException;
+ public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException;
/**
* Whether or not that term contains at least one bind marker.
@@ -108,7 +108,7 @@ public interface Term
public abstract class Terminal implements Term
{
public void collectMarkerSpecification(VariableSpecifications boundNames) {}
- public Terminal bind(List<ByteBuffer> values) { return this; }
+ public Terminal bind(QueryOptions options) { return this; }
// While some NonTerminal may not have bind markers, no Term can be Terminal
// with a bind marker
@@ -120,11 +120,11 @@ public interface Term
/**
* @return the serialized value of this terminal.
*/
- public abstract ByteBuffer get();
+ public abstract ByteBuffer get(QueryOptions options);
- public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+ public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
{
- return get();
+ return get(options);
}
}
@@ -140,10 +140,10 @@ public interface Term
*/
public abstract class NonTerminal implements Term
{
- public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+ public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
{
- Terminal t = bind(values);
- return t == null ? null : t.get();
+ Terminal t = bind(options);
+ return t == null ? null : t.get(options);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index fad8fae..8a47536 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
public class UpdateParameters
{
public final CFMetaData metadata;
- public final List<ByteBuffer> variables;
+ public final QueryOptions options;
public final long timestamp;
private final int ttl;
public final int localDeletionTime;
@@ -42,10 +42,10 @@ public class UpdateParameters
// For lists operation that require a read-before-write. Will be null otherwise.
private final Map<ByteBuffer, CQL3Row> prefetchedLists;
- public UpdateParameters(CFMetaData metadata, List<ByteBuffer> variables, long timestamp, int ttl, Map<ByteBuffer, CQL3Row> prefetchedLists)
+ public UpdateParameters(CFMetaData metadata, QueryOptions options, long timestamp, int ttl, Map<ByteBuffer, CQL3Row> prefetchedLists)
{
this.metadata = metadata;
- this.variables = variables;
+ this.options = options;
this.timestamp = timestamp;
this.ttl = ttl;
this.localDeletionTime = (int)(System.currentTimeMillis() / 1000);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/UserTypes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UserTypes.java b/src/java/org/apache/cassandra/cql3/UserTypes.java
index 2fd1a0f..2faa960 100644
--- a/src/java/org/apache/cassandra/cql3/UserTypes.java
+++ b/src/java/org/apache/cassandra/cql3/UserTypes.java
@@ -65,7 +65,7 @@ public abstract class UserTypes
values.add(value);
}
DelayedValue value = new DelayedValue(((UserType)receiver.type), values);
- return allTerminal ? value.bind(Collections.<ByteBuffer>emptyList()) : value;
+ return allTerminal ? value.bind(QueryOptions.DEFAULT) : value;
}
private void validateAssignableTo(String keyspace, ColumnSpecification receiver) throws InvalidRequestException
@@ -144,12 +144,16 @@ public abstract class UserTypes
values.get(i).collectMarkerSpecification(boundNames);
}
- private ByteBuffer[] bindInternal(List<ByteBuffer> variables) throws InvalidRequestException
+ private ByteBuffer[] bindInternal(QueryOptions options) throws InvalidRequestException
{
+ // Inside UDT values, we must force the serialization of collections whatever the protocol version is in
+ // use since we're going to store directly that serialized value.
+ options = options.withProtocolVersion(3);
+
ByteBuffer[] buffers = new ByteBuffer[values.size()];
for (int i = 0; i < type.types.size(); i++)
{
- ByteBuffer buffer = values.get(i).bindAndGet(variables);
+ ByteBuffer buffer = values.get(i).bindAndGet(options);
if (buffer == null)
throw new InvalidRequestException("null is not supported inside user type literals");
if (buffer.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
@@ -163,15 +167,15 @@ public abstract class UserTypes
return buffers;
}
- public Constants.Value bind(List<ByteBuffer> variables) throws InvalidRequestException
+ public Constants.Value bind(QueryOptions options) throws InvalidRequestException
{
- return new Constants.Value(bindAndGet(variables));
+ return new Constants.Value(bindAndGet(options));
}
@Override
- public ByteBuffer bindAndGet(List<ByteBuffer> variables) throws InvalidRequestException
+ public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
{
- return CompositeType.build(bindInternal(variables));
+ return CompositeType.build(bindInternal(options));
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
index 083543a..f99a2e4 100644
--- a/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
+++ b/src/java/org/apache/cassandra/cql3/functions/FunctionCall.java
@@ -46,19 +46,19 @@ public class FunctionCall extends Term.NonTerminal
t.collectMarkerSpecification(boundNames);
}
- public Term.Terminal bind(List<ByteBuffer> values) throws InvalidRequestException
+ public Term.Terminal bind(QueryOptions options) throws InvalidRequestException
{
- return makeTerminal(fun, bindAndGet(values));
+ return makeTerminal(fun, bindAndGet(options), options.getProtocolVersion());
}
- public ByteBuffer bindAndGet(List<ByteBuffer> values) throws InvalidRequestException
+ public ByteBuffer bindAndGet(QueryOptions options) throws InvalidRequestException
{
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(terms.size());
for (Term t : terms)
{
// For now, we don't allow nulls as argument as no existing function needs it and it
// simplify things.
- ByteBuffer val = t.bindAndGet(values);
+ ByteBuffer val = t.bindAndGet(options);
if (val == null)
throw new InvalidRequestException(String.format("Invalid null value for argument to %s", fun));
buffers.add(val);
@@ -77,16 +77,16 @@ public class FunctionCall extends Term.NonTerminal
return false;
}
- private static Term.Terminal makeTerminal(Function fun, ByteBuffer result) throws InvalidRequestException
+ private static Term.Terminal makeTerminal(Function fun, ByteBuffer result, int version) throws InvalidRequestException
{
if (!(fun.returnType() instanceof CollectionType))
return new Constants.Value(result);
switch (((CollectionType)fun.returnType()).kind)
{
- case LIST: return Lists.Value.fromSerialized(result, (ListType)fun.returnType());
- case SET: return Sets.Value.fromSerialized(result, (SetType)fun.returnType());
- case MAP: return Maps.Value.fromSerialized(result, (MapType)fun.returnType());
+ case LIST: return Lists.Value.fromSerialized(result, (ListType)fun.returnType(), version);
+ case SET: return Sets.Value.fromSerialized(result, (SetType)fun.returnType(), version);
+ case MAP: return Maps.Value.fromSerialized(result, (MapType)fun.returnType(), version);
}
throw new AssertionError();
}
@@ -119,7 +119,7 @@ public class FunctionCall extends Term.NonTerminal
// If all parameters are terminal and the function is pure, we can
// evaluate it now, otherwise we'd have to wait execution time
return allTerminal && fun.isPure()
- ? makeTerminal(fun, execute(fun, parameters))
+ ? makeTerminal(fun, execute(fun, parameters), QueryOptions.DEFAULT.getProtocolVersion())
: new FunctionCall(fun, parameters);
}
@@ -130,7 +130,7 @@ public class FunctionCall extends Term.NonTerminal
for (Term t : parameters)
{
assert t instanceof Term.Terminal;
- buffers.add(((Term.Terminal)t).get());
+ buffers.add(((Term.Terminal)t).get(QueryOptions.DEFAULT));
}
return fun.execute(buffers);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 88bb644..95d504d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -116,16 +116,16 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
return statements;
}
- private Collection<? extends IMutation> getMutations(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
+ private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now)
throws RequestExecutionException, RequestValidationException
{
Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
for (int i = 0; i < statements.size(); i++)
{
ModificationStatement statement = statements.get(i);
- List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
- long timestamp = attrs.getTimestamp(now, statementVariables);
- addStatementMutations(statement, statementVariables, local, cl, timestamp, mutations);
+ QueryOptions statementOptions = options.forStatement(i);
+ long timestamp = attrs.getTimestamp(now, statementOptions);
+ addStatementMutations(statement, statementOptions, local, timestamp, mutations);
}
return unzipMutations(mutations);
}
@@ -143,9 +143,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
}
private void addStatementMutations(ModificationStatement statement,
- List<ByteBuffer> variables,
+ QueryOptions options,
boolean local,
- ConsistencyLevel cl,
long now,
Map<String, Map<ByteBuffer, IMutation>> mutations)
throws RequestExecutionException, RequestValidationException
@@ -161,9 +160,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
// The following does the same than statement.getMutations(), but we inline it here because
// we don't want to recreate mutations every time as this is particularly inefficient when applying
// multiple batch to the same partition (see #6737).
- List<ByteBuffer> keys = statement.buildPartitionKeyNames(variables);
- Composite clusteringPrefix = statement.createClusteringPrefix(variables);
- UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+ List<ByteBuffer> keys = statement.buildPartitionKeyNames(options);
+ Composite clusteringPrefix = statement.createClusteringPrefix(options);
+ UpdateParameters params = statement.makeUpdateParameters(keys, clusteringPrefix, options, local, now);
for (ByteBuffer key : keys)
{
@@ -172,7 +171,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
if (mutation == null)
{
mut = new Mutation(ksName, key);
- mutation = type == Type.COUNTER ? new CounterMutation(mut, cl) : mut;
+ mutation = type == Type.COUNTER ? new CounterMutation(mut, options.getConsistency()) : mut;
ksMap.put(key, mutation);
}
else
@@ -209,29 +208,26 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
- if (options.getConsistency() == null)
- throw new InvalidRequestException("Invalid empty consistency level");
-
- return execute(new PreparedBatchVariables(options.getValues()), false, options.getConsistency(), options.getSerialConsistency(), queryState.getTimestamp());
+ return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options));
}
- public ResultMessage executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
+ public ResultMessage execute(QueryState queryState, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException
{
- if (cl == null)
- throw new InvalidRequestException("Invalid empty consistency level");
-
- return execute(new BatchOfPreparedVariables(variables), false, cl, ConsistencyLevel.SERIAL, queryState.getTimestamp());
+ return execute(options, false, options.getTimestamp(queryState));
}
- public ResultMessage execute(BatchVariables variables, boolean local, ConsistencyLevel cl, ConsistencyLevel serialCl, long now)
+ public ResultMessage execute(BatchQueryOptions options, boolean local, long now)
throws RequestExecutionException, RequestValidationException
{
- // TODO: we don't support a serial consistency for batches in the protocol so defaulting to SERIAL for now.
- // We'll need to fix that.
+ if (options.getConsistency() == null)
+ throw new InvalidRequestException("Invalid empty consistency level");
+ if (options.getSerialConsistency() == null)
+ throw new InvalidRequestException("Invalid empty serial consistency level");
+
if (hasConditions)
- return executeWithConditions(variables, cl, serialCl, now);
+ return executeWithConditions(options, now);
- executeWithoutConditions(getMutations(variables, local, cl, now), cl);
+ executeWithoutConditions(getMutations(options, local, now), options.getConsistency());
return null;
}
@@ -251,8 +247,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
}
-
- private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
+ private ResultMessage executeWithConditions(BatchQueryOptions options, long now)
throws RequestExecutionException, RequestValidationException
{
ByteBuffer key = null;
@@ -265,9 +260,9 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
for (int i = 0; i < statements.size(); i++)
{
ModificationStatement statement = statements.get(i);
- List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
- long timestamp = attrs.getTimestamp(now, statementVariables);
- List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementVariables);
+ QueryOptions statementOptions = options.forStatement(i);
+ long timestamp = attrs.getTimestamp(now, statementOptions);
+ List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementOptions);
if (pks.size() > 1)
throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)");
if (key == null)
@@ -283,10 +278,10 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
}
- Composite clusteringPrefix = statement.createClusteringPrefix(statementVariables);
+ Composite clusteringPrefix = statement.createClusteringPrefix(statementOptions);
if (statement.hasConditions())
{
- statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
+ statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementOptions, timestamp);
// As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
if (statement.hasIfNotExistCondition() || statement.hasIfExistCondition())
columnsWithConditions = null;
@@ -295,20 +290,20 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
}
else
{
- UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementVariables, false, cl, now);
+ UpdateParameters params = statement.makeUpdateParameters(Collections.singleton(key), clusteringPrefix, statementOptions, false, now);
statement.addUpdateForKey(updates, key, clusteringPrefix, params);
}
}
verifyBatchSize(Collections.singleton(updates));
- ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
+ ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, options.getSerialConsistency(), options.getConsistency());
return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
}
public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
{
assert !hasConditions;
- for (IMutation mutation : getMutations(PreparedBatchVariables.EMPTY, true, null, queryState.getTimestamp()))
+ for (IMutation mutation : getMutations(BatchQueryOptions.DEFAULT, true, queryState.getTimestamp()))
{
// We don't use counters internally.
assert mutation instanceof Mutation;
@@ -322,38 +317,6 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
public List<ByteBuffer> getVariablesForStatement(int statementInBatch);
}
- public static class PreparedBatchVariables implements BatchVariables
- {
- public static final BatchVariables EMPTY = new PreparedBatchVariables(Collections.<ByteBuffer>emptyList());
-
- private final List<ByteBuffer> variables;
-
- public PreparedBatchVariables(List<ByteBuffer> variables)
- {
- this.variables = variables;
- }
-
- public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
- {
- return variables;
- }
- }
-
- public static class BatchOfPreparedVariables implements BatchVariables
- {
- private final List<List<ByteBuffer>> variables;
-
- public BatchOfPreparedVariables(List<List<ByteBuffer>> variables)
- {
- this.variables = variables;
- }
-
- public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
- {
- return variables.get(statementInBatch);
- }
- }
-
public String toString()
{
return String.format("BatchStatement(type=%s, statements=%s)", type, statements);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
index 4003edc..5005d2f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
@@ -71,7 +71,7 @@ public class CQL3CasConditions implements CASConditions
throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT EXISTS conditions for the same row");
}
- public void addConditions(Composite prefix, Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+ public void addConditions(Composite prefix, Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException
{
RowCondition condition = conditions.get(prefix);
if (condition == null)
@@ -83,7 +83,7 @@ public class CQL3CasConditions implements CASConditions
{
throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row");
}
- ((ColumnsConditions)condition).addConditions(conds, variables);
+ ((ColumnsConditions)condition).addConditions(conds, options);
}
public IDiskAtomFilter readFilter()
@@ -167,21 +167,21 @@ public class CQL3CasConditions implements CASConditions
private static class ColumnsConditions extends RowCondition
{
- private final Map<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.WithVariables> conditions = new HashMap<>();
+ private final Map<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.WithOptions> conditions = new HashMap<>();
private ColumnsConditions(Composite rowPrefix, long now)
{
super(rowPrefix, now);
}
- public void addConditions(Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+ public void addConditions(Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException
{
for (ColumnCondition condition : conds)
{
// We will need the variables in appliesTo but with protocol batches, each condition in this object can have a
// different list of variables.
- ColumnCondition.WithVariables current = condition.with(variables);
- ColumnCondition.WithVariables previous = conditions.put(Pair.create(condition.column.name, current.getCollectionElementValue()), current);
+ ColumnCondition.WithOptions current = condition.with(options);
+ ColumnCondition.WithOptions previous = conditions.put(Pair.create(condition.column.name, current.getCollectionElementValue()), current);
// If 2 conditions are actually equal, let it slide
if (previous != null && !previous.equalsTo(current))
throw new InvalidRequestException("Duplicate and incompatible conditions for column " + condition.column.name);
@@ -193,7 +193,7 @@ public class CQL3CasConditions implements CASConditions
if (current == null)
return conditions.isEmpty();
- for (ColumnCondition.WithVariables condition : conditions.values())
+ for (ColumnCondition.WithOptions condition : conditions.values())
if (!condition.appliesTo(rowPrefix, current, now))
return false;
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 4741b9a..7f8b678 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -119,9 +119,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return cfm.isCounter();
}
- public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
+ public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
{
- return attrs.getTimestamp(now, variables);
+ return attrs.getTimestamp(now, options);
}
public boolean isTimestampSet()
@@ -129,9 +129,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return attrs.isTimestampSet();
}
- public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
+ public int getTimeToLive(QueryOptions options) throws InvalidRequestException
{
- return attrs.getTimeToLive(variables);
+ return attrs.getTimeToLive(options);
}
public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
@@ -284,7 +284,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
}
}
- public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
+ public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options)
throws InvalidRequestException
{
CBuilder keyBuilder = cfm.getKeyValidatorAsCType().builder();
@@ -295,7 +295,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (r == null)
throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
if (keyBuilder.remainingCount() == 1)
{
@@ -321,7 +321,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return keys;
}
- public Composite createClusteringPrefix(List<ByteBuffer> variables)
+ public Composite createClusteringPrefix(QueryOptions options)
throws InvalidRequestException
{
// If the only updated/deleted columns are static, then we don't need clustering columns.
@@ -353,10 +353,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
}
}
- return createClusteringPrefixBuilderInternal(variables);
+ return createClusteringPrefixBuilderInternal(options);
}
- private Composite createClusteringPrefixBuilderInternal(List<ByteBuffer> variables)
+ private Composite createClusteringPrefixBuilderInternal(QueryOptions options)
throws InvalidRequestException
{
CBuilder builder = cfm.comparator.prefixBuilder();
@@ -376,7 +376,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
}
else
{
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
assert values.size() == 1; // We only allow IN for row keys so far
ByteBuffer val = values.get(0);
if (val == null)
@@ -488,7 +488,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
else
cl.validateForWrite(cfm.ksName);
- Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp());
+ Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState));
if (!mutations.isEmpty())
StorageProxy.mutateWithTriggers(mutations, cl, false);
@@ -498,18 +498,18 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options)
throws RequestExecutionException, RequestValidationException
{
- List<ByteBuffer> variables = options.getValues();
- List<ByteBuffer> keys = buildPartitionKeyNames(variables);
+ List<ByteBuffer> keys = buildPartitionKeyNames(options);
// We don't support IN for CAS operation so far
if (keys.size() > 1)
throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
ByteBuffer key = keys.get(0);
- CQL3CasConditions conditions = new CQL3CasConditions(cfm, queryState.getTimestamp());
- Composite prefix = createClusteringPrefix(variables);
+ long now = options.getTimestamp(queryState);
+ CQL3CasConditions conditions = new CQL3CasConditions(cfm, now);
+ Composite prefix = createClusteringPrefix(options);
ColumnFamily updates = ArrayBackedSortedColumns.factory.create(cfm);
- addUpdatesAndConditions(key, prefix, updates, conditions, variables, getTimestamp(queryState.getTimestamp(), variables));
+ addUpdatesAndConditions(key, prefix, updates, conditions, options, getTimestamp(now, options));
ColumnFamily result = StorageProxy.cas(keyspace(),
columnFamily(),
@@ -521,10 +521,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return new ResultMessage.Rows(buildCasResultSet(key, result));
}
- public void addUpdatesAndConditions(ByteBuffer key, Composite clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, List<ByteBuffer> variables, long now)
+ public void addUpdatesAndConditions(ByteBuffer key, Composite clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, QueryOptions options, long now)
throws InvalidRequestException
{
- UpdateParameters updParams = new UpdateParameters(cfm, variables, now, getTimeToLive(variables), null);
+ UpdateParameters updParams = new UpdateParameters(cfm, options, now, getTimeToLive(options), null);
addUpdateForKey(updates, key, clusteringPrefix, updParams);
if (ifNotExists)
@@ -541,9 +541,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
else
{
if (columnConditions != null)
- conditions.addConditions(clusteringPrefix, columnConditions, variables);
+ conditions.addConditions(clusteringPrefix, columnConditions, options);
if (staticConditions != null)
- conditions.addConditions(cfm.comparator.staticPrefix(), staticConditions, variables);
+ conditions.addConditions(cfm.comparator.staticPrefix(), staticConditions, options);
}
}
@@ -614,7 +614,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
long now = System.currentTimeMillis();
Selection.ResultSetBuilder builder = selection.resultSetBuilder(now);
- SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, Collections.<ByteBuffer>emptyList(), now, builder);
+ SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, QueryOptions.DEFAULT, now, builder);
return builder.build();
}
@@ -624,7 +624,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (hasConditions())
throw new UnsupportedOperationException();
- for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
+ for (IMutation mutation : getMutations(QueryOptions.DEFAULT, true, queryState.getTimestamp()))
{
// We don't use counters internally.
assert mutation instanceof Mutation;
@@ -636,7 +636,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
/**
* Convert statement into a list of mutations to apply on the server
*
- * @param variables value for prepared statement markers
+ * @param options value for prepared statement markers
* @param local if true, any requests (for collections) performed by getMutation should be done locally only.
* @param cl the consistency to use for the potential reads involved in generating the mutations (for lists set/delete operations)
* @param now the current timestamp in microseconds to use if no timestamp is user provided.
@@ -644,13 +644,13 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
* @return list of the mutations
* @throws InvalidRequestException on invalid requests
*/
- private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+ private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now)
throws RequestExecutionException, RequestValidationException
{
- List<ByteBuffer> keys = buildPartitionKeyNames(variables);
- Composite clusteringPrefix = createClusteringPrefix(variables);
+ List<ByteBuffer> keys = buildPartitionKeyNames(options);
+ Composite clusteringPrefix = createClusteringPrefix(options);
- UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+ UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, options, local, now);
Collection<IMutation> mutations = new ArrayList<IMutation>();
for (ByteBuffer key: keys)
@@ -659,22 +659,21 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
addUpdateForKey(cf, key, clusteringPrefix, params);
Mutation mut = new Mutation(cfm.ksName, key, cf);
- mutations.add(isCounter() ? new CounterMutation(mut, cl) : mut);
+ mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut);
}
return mutations;
}
public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
Composite prefix,
- List<ByteBuffer> variables,
+ QueryOptions options,
boolean local,
- ConsistencyLevel cl,
long now)
throws RequestExecutionException, RequestValidationException
{
// Some lists operation requires reading
- Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, cl);
- return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+ Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, options.getConsistency());
+ return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), rows);
}
public static abstract class Parsed extends CFStatement
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9872b74e/src/java/org/apache/cassandra/cql3/statements/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Restriction.java b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
index 6b7eca7..4fd02c1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
@@ -42,7 +42,7 @@ public interface Restriction
public boolean isContains();
// Not supported by Slice, but it's convenient to have here
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException;
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException;
public static class EQ implements Restriction
{
@@ -55,9 +55,9 @@ public interface Restriction
this.onToken = onToken;
}
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
- return Collections.singletonList(value.bindAndGet(variables));
+ return Collections.singletonList(value.bindAndGet(options));
}
public boolean isSlice()
@@ -145,11 +145,11 @@ public interface Restriction
this.values = values;
}
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size());
for (Term value : values)
- buffers.add(value.bindAndGet(variables));
+ buffers.add(value.bindAndGet(options));
return buffers;
}
@@ -174,9 +174,9 @@ public interface Restriction
this.marker = marker;
}
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
- Lists.Value lval = marker.bind(variables);
+ Lists.Value lval = marker.bind(options);
if (lval == null)
throw new InvalidRequestException("Invalid null value for IN restriction");
return lval.elements;
@@ -234,7 +234,7 @@ public interface Restriction
return false;
}
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
throw new UnsupportedOperationException();
}
@@ -249,9 +249,9 @@ public interface Restriction
return bounds[b.idx] != null;
}
- public ByteBuffer bound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
+ public ByteBuffer bound(Bound b, QueryOptions options) throws InvalidRequestException
{
- return bounds[b.idx].bindAndGet(variables);
+ return bounds[b.idx].bindAndGet(options);
}
public boolean isInclusive(Bound b)
@@ -379,25 +379,25 @@ public interface Restriction
keys.add(t);
}
- public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
if (values == null)
return Collections.emptyList();
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size());
for (Term value : values)
- buffers.add(value.bindAndGet(variables));
+ buffers.add(value.bindAndGet(options));
return buffers;
}
- public List<ByteBuffer> keys(List<ByteBuffer> variables) throws InvalidRequestException
+ public List<ByteBuffer> keys(QueryOptions options) throws InvalidRequestException
{
if (keys == null)
return Collections.emptyList();
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(keys.size());
for (Term value : keys)
- buffers.add(value.bindAndGet(variables));
+ buffers.add(value.bindAndGet(options));
return buffers;
}