You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2014/05/07 22:57:33 UTC
git commit: Fix broken build
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 0cb1db68f -> 60dbe8b70
Fix broken build
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60dbe8b7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60dbe8b7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60dbe8b7
Branch: refs/heads/cassandra-2.1
Commit: 60dbe8b700ee0ee1a15fbcb94f9543d1477de7a3
Parents: 0cb1db6
Author: Jake Luciani <ja...@apache.org>
Authored: Wed May 7 16:55:53 2014 -0400
Committer: Jake Luciani <ja...@apache.org>
Committed: Wed May 7 16:55:53 2014 -0400
----------------------------------------------------------------------
.../cql3/statements/ModificationStatement.java | 338 ++++++++-----------
1 file changed, 141 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60dbe8b7/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 23f7cfe..7f8b678 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -23,23 +23,18 @@ import java.util.*;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.github.jamm.MemoryMeter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.service.CASConditions;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
@@ -54,21 +49,16 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
{
private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]", false);
- private static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
-
- private static boolean loggedCounterTTL = false;
- private static boolean loggedCounterTimestamp = false;
-
public static enum StatementType { INSERT, UPDATE, DELETE }
public final StatementType type;
+ private final int boundTerms;
public final CFMetaData cfm;
public final Attributes attrs;
private final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<ColumnIdentifier, Restriction>();
private final List<Operation> columnOperations = new ArrayList<Operation>();
- private int boundTerms;
// Separating normal and static conditions makes things somewhat easier
private List<ColumnCondition> columnConditions;
private List<ColumnCondition> staticConditions;
@@ -80,17 +70,18 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
private boolean setsStaticColumns;
private boolean setsRegularColumns;
- private final Function<ColumnCondition, ColumnIdentifier> getColumnForCondition = new Function<ColumnCondition, ColumnIdentifier>()
+ private final Function<ColumnCondition, ColumnDefinition> getColumnForCondition = new Function<ColumnCondition, ColumnDefinition>()
{
- public ColumnIdentifier apply(ColumnCondition cond)
+ public ColumnDefinition apply(ColumnCondition cond)
{
- return cond.column.name;
+ return cond.column;
}
};
- public ModificationStatement(StatementType type, CFMetaData cfm, Attributes attrs)
+ public ModificationStatement(StatementType type, int boundTerms, CFMetaData cfm, Attributes attrs)
{
this.type = type;
+ this.boundTerms = boundTerms;
this.cfm = cfm;
this.attrs = attrs;
}
@@ -106,7 +97,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
}
public abstract boolean requireFullClusteringKey();
- public abstract void addUpdateForKey(ColumnFamily updates, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException;
+ public abstract void addUpdateForKey(ColumnFamily updates, ByteBuffer key, Composite prefix, UpdateParameters params) throws InvalidRequestException;
public int getBoundTerms()
{
@@ -125,12 +116,12 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
public boolean isCounter()
{
- return cfm.getDefaultValidator().isCommutative();
+ return cfm.isCounter();
}
- public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
+ public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException
{
- return attrs.getTimestamp(now, variables);
+ return attrs.getTimestamp(now, options);
}
public boolean isTimestampSet()
@@ -138,9 +129,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return attrs.isTimestampSet();
}
- public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
+ public int getTimeToLive(QueryOptions options) throws InvalidRequestException
{
- return attrs.getTimeToLive(variables);
+ return attrs.getTimeToLive(options);
}
public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
@@ -155,31 +146,18 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
public void validate(ClientState state) throws InvalidRequestException
{
if (hasConditions() && attrs.isTimestampSet())
- throw new InvalidRequestException("Cannot provide custom timestamp for conditional update");
+ throw new InvalidRequestException("Cannot provide custom timestamp for conditional updates");
- if (isCounter())
- {
- if (attrs.isTimestampSet() && !loggedCounterTimestamp)
- {
- logger.warn("Detected use of 'USING TIMESTAMP' in a counter UPDATE. This is invalid " +
- "because counters do not use timestamps, and the timestamp has been ignored. " +
- "Such queries will be rejected in Cassandra 2.1+ - please fix your queries before then.");
- loggedCounterTimestamp = true;
- }
+ if (isCounter() && attrs.isTimestampSet())
+ throw new InvalidRequestException("Cannot provide custom timestamp for counter updates");
- if (attrs.isTimeToLiveSet() && !loggedCounterTTL)
- {
- logger.warn("Detected use of 'USING TTL' in a counter UPDATE. This is invalid " +
- "because counter tables do not support TTL, and the TTL value has been ignored. " +
- "Such queries will be rejected in Cassandra 2.1+ - please fix your queries before then.");
- loggedCounterTTL = true;
- }
- }
+ if (isCounter() && attrs.isTimeToLiveSet())
+ throw new InvalidRequestException("Cannot provide custom TTL for counter updates");
}
public void addOperation(Operation op)
{
- if (op.isStatic(cfm))
+ if (op.column.isStatic())
setsStaticColumns = true;
else
setsRegularColumns = true;
@@ -191,19 +169,19 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return columnOperations;
}
- public Iterable<ColumnIdentifier> getColumnsWithConditions()
+ public Iterable<ColumnDefinition> getColumnsWithConditions()
{
if (ifNotExists || ifExists)
return null;
- return Iterables.concat(columnConditions == null ? Collections.<ColumnIdentifier>emptyList() : Iterables.transform(columnConditions, getColumnForCondition),
- staticConditions == null ? Collections.<ColumnIdentifier>emptyList() : Iterables.transform(staticConditions, getColumnForCondition));
+ return Iterables.concat(columnConditions == null ? Collections.<ColumnDefinition>emptyList() : Iterables.transform(columnConditions, getColumnForCondition),
+ staticConditions == null ? Collections.<ColumnDefinition>emptyList() : Iterables.transform(staticConditions, getColumnForCondition));
}
public void addCondition(ColumnCondition cond) throws InvalidRequestException
{
List<ColumnCondition> conds = null;
- if (cond.column.kind == CFDefinition.Name.Kind.STATIC)
+ if (cond.column.isStatic())
{
setsStaticColumns = true;
if (staticConditions == null)
@@ -240,45 +218,44 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return ifExists;
}
- private void addKeyValues(CFDefinition.Name name, Restriction values) throws InvalidRequestException
+ private void addKeyValues(ColumnDefinition def, Restriction values) throws InvalidRequestException
{
- if (name.kind == CFDefinition.Name.Kind.COLUMN_ALIAS)
+ if (def.kind == ColumnDefinition.Kind.CLUSTERING_COLUMN)
hasNoClusteringColumns = false;
- if (processedKeys.put(name.name, values) != null)
- throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name.name));
+ if (processedKeys.put(def.name, values) != null)
+ throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", def.name));
}
- public void addKeyValue(CFDefinition.Name name, Term value) throws InvalidRequestException
+ public void addKeyValue(ColumnDefinition def, Term value) throws InvalidRequestException
{
- addKeyValues(name, new Restriction.EQ(value, false));
+ addKeyValues(def, new Restriction.EQ(value, false));
}
public void processWhereClause(List<Relation> whereClause, VariableSpecifications names) throws InvalidRequestException
{
- CFDefinition cfDef = cfm.getCfDef();
for (Relation rel : whereClause)
{
- CFDefinition.Name name = cfDef.get(rel.getEntity());
- if (name == null)
+ ColumnDefinition def = cfm.getColumnDefinition(rel.getEntity());
+ if (def == null)
throw new InvalidRequestException(String.format("Unknown key identifier %s", rel.getEntity()));
- switch (name.kind)
+ switch (def.kind)
{
- case KEY_ALIAS:
- case COLUMN_ALIAS:
+ case PARTITION_KEY:
+ case CLUSTERING_COLUMN:
Restriction restriction;
if (rel.operator() == Relation.Type.EQ)
{
- Term t = rel.getValue().prepare(name);
+ Term t = rel.getValue().prepare(keyspace(), def);
t.collectMarkerSpecification(names);
restriction = new Restriction.EQ(t, false);
}
- else if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS && rel.operator() == Relation.Type.IN)
+ else if (def.kind == ColumnDefinition.Kind.PARTITION_KEY && rel.operator() == Relation.Type.IN)
{
if (rel.getValue() != null)
{
- Term t = rel.getValue().prepare(name);
+ Term t = rel.getValue().prepare(keyspace(), def);
t.collectMarkerSpecification(names);
restriction = Restriction.IN.create(t);
}
@@ -287,7 +264,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
List<Term> values = new ArrayList<Term>(rel.getInValues().size());
for (Term.Raw raw : rel.getInValues())
{
- Term t = raw.prepare(name);
+ Term t = raw.prepare(keyspace(), def);
t.collectMarkerSpecification(names);
values.add(t);
}
@@ -296,40 +273,37 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
}
else
{
- throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), name));
+ throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), def.name));
}
- addKeyValues(name, restriction);
+ addKeyValues(def, restriction);
break;
- case VALUE_ALIAS:
- case COLUMN_METADATA:
- case STATIC:
- throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", name));
+ default:
+ throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", def.name));
}
}
}
- public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
+ public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options)
throws InvalidRequestException
{
- CFDefinition cfDef = cfm.getCfDef();
- ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder();
+ CBuilder keyBuilder = cfm.getKeyValidatorAsCType().builder();
List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
- for (CFDefinition.Name name : cfDef.partitionKeys())
+ for (ColumnDefinition def : cfm.partitionKeyColumns())
{
- Restriction r = processedKeys.get(name.name);
+ Restriction r = processedKeys.get(def.name);
if (r == null)
- throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
+ throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
if (keyBuilder.remainingCount() == 1)
{
for (ByteBuffer val : values)
{
if (val == null)
- throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
- ByteBuffer key = keyBuilder.copy().add(val).build();
+ throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
+ ByteBuffer key = keyBuilder.buildWith(val).toByteBuffer();
ThriftValidation.validateKey(cfm, key);
keys.add(key);
}
@@ -340,14 +314,14 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
throw new InvalidRequestException("IN is only supported on the last column of the partition key");
ByteBuffer val = values.get(0);
if (val == null)
- throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
+ throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", def.name));
keyBuilder.add(val);
}
}
return keys;
}
- public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
+ public Composite createClusteringPrefix(QueryOptions options)
throws InvalidRequestException
{
// If the only updated/deleted columns are static, then we don't need clustering columns.
@@ -364,96 +338,83 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
{
// If we set no non-static columns, then it's fine not to have clustering columns
if (hasNoClusteringColumns)
- return cfm.getStaticColumnNameBuilder();
+ return cfm.comparator.staticPrefix();
// If we do have clustering columns however, then either it's an INSERT and the query is valid
// but we still need to build a proper prefix, or it's not an INSERT, and then we want to reject
// (see above)
if (type != StatementType.INSERT)
{
- for (CFDefinition.Name name : cfm.getCfDef().clusteringColumns())
- if (processedKeys.get(name.name) != null)
- throw new InvalidRequestException(String.format("Invalid restriction on clustering column %s since the %s statement modifies only static columns", name.name, type));
+ for (ColumnDefinition def : cfm.clusteringColumns())
+ if (processedKeys.get(def.name) != null)
+ throw new InvalidRequestException(String.format("Invalid restriction on clustering column %s since the %s statement modifies only static columns", def.name, type));
// we should get there as it contradicts hasNoClusteringColumns == false
throw new AssertionError();
}
}
- return createClusteringPrefixBuilderInternal(variables);
+ return createClusteringPrefixBuilderInternal(options);
}
- private ColumnNameBuilder updatePrefixFor(ByteBuffer name, ColumnNameBuilder prefix)
- {
- return isStatic(name) ? cfm.getStaticColumnNameBuilder() : prefix;
- }
-
- public boolean isStatic(ByteBuffer name)
- {
- ColumnDefinition def = cfm.getColumnDefinition(name);
- return def != null && def.type == ColumnDefinition.Type.STATIC;
- }
-
- private ColumnNameBuilder createClusteringPrefixBuilderInternal(List<ByteBuffer> variables)
+ private Composite createClusteringPrefixBuilderInternal(QueryOptions options)
throws InvalidRequestException
{
- CFDefinition cfDef = cfm.getCfDef();
- ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
- CFDefinition.Name firstEmptyKey = null;
- for (CFDefinition.Name name : cfDef.clusteringColumns())
+ CBuilder builder = cfm.comparator.prefixBuilder();
+ ColumnDefinition firstEmptyKey = null;
+ for (ColumnDefinition def : cfm.clusteringColumns())
{
- Restriction r = processedKeys.get(name.name);
+ Restriction r = processedKeys.get(def.name);
if (r == null)
{
- firstEmptyKey = name;
- if (requireFullClusteringKey() && cfDef.isComposite && !cfDef.isCompact)
- throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
+ firstEmptyKey = def;
+ if (requireFullClusteringKey() && !cfm.comparator.isDense() && cfm.comparator.isCompound())
+ throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", def.name));
}
else if (firstEmptyKey != null)
{
- throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s since %s is set", firstEmptyKey.name, name.name));
+ throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s since %s is set", firstEmptyKey.name, def.name));
}
else
{
- List<ByteBuffer> values = r.values(variables);
+ List<ByteBuffer> values = r.values(options);
assert values.size() == 1; // We only allow IN for row keys so far
ByteBuffer val = values.get(0);
if (val == null)
- throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", name));
+ throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", def.name));
builder.add(val);
}
}
- return builder;
+ return builder.build();
}
- protected CFDefinition.Name getFirstEmptyKey()
+ protected ColumnDefinition getFirstEmptyKey()
{
- for (CFDefinition.Name name : cfm.getCfDef().clusteringColumns())
+ for (ColumnDefinition def : cfm.clusteringColumns())
{
- if (processedKeys.get(name.name) == null)
- return name;
+ if (processedKeys.get(def.name) == null)
+ return def;
}
return null;
}
- protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
+ protected Map<ByteBuffer, CQL3Row> readRequiredRows(Collection<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
// Lists SET operation incurs a read.
- Set<ByteBuffer> toRead = null;
+ boolean requiresRead = false;
for (Operation op : columnOperations)
{
if (op.requiresRead())
{
- if (toRead == null)
- toRead = new TreeSet<ByteBuffer>(UTF8Type.instance);
- toRead.add(op.columnName.key);
+ requiresRead = true;
+ break;
}
}
- return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
+ return requiresRead ? readRows(partitionKeys, clusteringPrefix, cfm, local, cl) : null;
}
- private Map<ByteBuffer, ColumnGroupMap> readRows(Collection<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
+ protected Map<ByteBuffer, CQL3Row> readRows(Collection<ByteBuffer> partitionKeys, Composite rowPrefix, CFMetaData cfm, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
try
@@ -465,16 +426,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
throw new InvalidRequestException(String.format("Write operation require a read but consistency %s is not supported on reads", cl));
}
- ColumnSlice[] slices = new ColumnSlice[toRead.size()];
- int i = 0;
- for (ByteBuffer name : toRead)
- {
- ColumnNameBuilder prefix = updatePrefixFor(name, clusteringPrefix);
- ByteBuffer start = prefix.copy().add(name).build();
- ByteBuffer finish = prefix.copy().add(name).buildAsEndOfRange();
- slices[i++] = new ColumnSlice(start, finish);
- }
-
+ ColumnSlice[] slices = new ColumnSlice[]{ rowPrefix.slice() };
List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size());
long now = System.currentTimeMillis();
for (ByteBuffer key : partitionKeys)
@@ -488,20 +440,19 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
? SelectStatement.readLocally(keyspace(), commands)
: StorageProxy.read(commands, cl);
- Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer, ColumnGroupMap>();
+ Map<ByteBuffer, CQL3Row> map = new HashMap<ByteBuffer, CQL3Row>();
for (Row row : rows)
{
- if (row.cf == null || row.cf.getColumnCount() == 0)
+ if (row.cf == null || row.cf.isEmpty())
continue;
- ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true, now);
- for (Column column : row.cf)
- groupBuilder.add(column);
-
- List<ColumnGroupMap> groups = groupBuilder.groups();
- assert groups.isEmpty() || groups.size() == 1;
- if (!groups.isEmpty())
- map.put(row.key.key, groups.get(0));
+ Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(row.cf.getSortedColumns().iterator());
+ if (iter.hasNext())
+ {
+ map.put(row.key.getKey(), iter.next());
+ // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key)
+ assert !iter.hasNext();
+ }
}
return map;
}
@@ -537,7 +488,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
else
cl.validateForWrite(cfm.ksName);
- Collection<? extends IMutation> mutations = getMutations(options.getValues(), false, cl, queryState.getTimestamp());
+ Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState));
if (!mutations.isEmpty())
StorageProxy.mutateWithTriggers(mutations, cl, false);
@@ -547,18 +498,18 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options)
throws RequestExecutionException, RequestValidationException
{
- List<ByteBuffer> variables = options.getValues();
- List<ByteBuffer> keys = buildPartitionKeyNames(variables);
+ List<ByteBuffer> keys = buildPartitionKeyNames(options);
// We don't support IN for CAS operation so far
if (keys.size() > 1)
throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
ByteBuffer key = keys.get(0);
- CQL3CasConditions conditions = new CQL3CasConditions(cfm, queryState.getTimestamp());
- ColumnNameBuilder prefix = createClusteringPrefixBuilder(variables);
- ColumnFamily updates = UnsortedColumns.factory.create(cfm);
- addUpdatesAndConditions(key, prefix, updates, conditions, variables, getTimestamp(queryState.getTimestamp(), variables));
+ long now = options.getTimestamp(queryState);
+ CQL3CasConditions conditions = new CQL3CasConditions(cfm, now);
+ Composite prefix = createClusteringPrefix(options);
+ ColumnFamily updates = ArrayBackedSortedColumns.factory.create(cfm);
+ addUpdatesAndConditions(key, prefix, updates, conditions, options, getTimestamp(now, options));
ColumnFamily result = StorageProxy.cas(keyspace(),
columnFamily(),
@@ -570,16 +521,16 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return new ResultMessage.Rows(buildCasResultSet(key, result));
}
- public void addUpdatesAndConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, List<ByteBuffer> variables, long now)
+ public void addUpdatesAndConditions(ByteBuffer key, Composite clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, QueryOptions options, long now)
throws InvalidRequestException
{
- UpdateParameters updParams = new UpdateParameters(cfm, variables, now, getTimeToLive(variables), null);
+ UpdateParameters updParams = new UpdateParameters(cfm, options, now, getTimeToLive(options), null);
addUpdateForKey(updates, key, clusteringPrefix, updParams);
if (ifNotExists)
{
// If we use ifNotExists, if the statement applies to any non static columns, then the condition is on the row of the non-static
- // columns and the prefix should be the rowPrefix. But if only static columns are set, then the ifNotExists apply to the existence
+ // columns and the prefix should be the clusteringPrefix. But if only static columns are set, then the ifNotExists apply to the existence
// of any static columns and we should use the prefix for the "static part" of the partition.
conditions.addNotExist(clusteringPrefix);
}
@@ -590,9 +541,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
else
{
if (columnConditions != null)
- conditions.addConditions(clusteringPrefix, columnConditions, variables);
+ conditions.addConditions(clusteringPrefix, columnConditions, options);
if (staticConditions != null)
- conditions.addConditions(cfm.getStaticColumnNameBuilder(), staticConditions, variables);
+ conditions.addConditions(cfm.comparator.staticPrefix(), staticConditions, options);
}
}
@@ -601,7 +552,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return buildCasResultSet(keyspace(), key, columnFamily(), cf, getColumnsWithConditions(), false);
}
- public static ResultSet buildCasResultSet(String ksName, ByteBuffer key, String cfName, ColumnFamily cf, Iterable<ColumnIdentifier> columnsWithConditions, boolean isBatch)
+ public static ResultSet buildCasResultSet(String ksName, ByteBuffer key, String cfName, ColumnFamily cf, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch)
throws InvalidRequestException
{
boolean success = cf == null;
@@ -637,34 +588,33 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return new ResultSet(new ResultSet.Metadata(specs), rows);
}
- private static ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf, Iterable<ColumnIdentifier> columnsWithConditions, boolean isBatch)
+ private static ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch)
throws InvalidRequestException
{
- CFDefinition cfDef = cf.metadata().getCfDef();
-
+ CFMetaData cfm = cf.metadata();
Selection selection;
if (columnsWithConditions == null)
{
- selection = Selection.wildcard(cfDef);
+ selection = Selection.wildcard(cfm);
}
else
{
- List<CFDefinition.Name> names = new ArrayList<CFDefinition.Name>();
+ List<ColumnDefinition> defs = new ArrayList<>();
// Adding the partition key for batches to disambiguate if the conditions span multipe rows (we don't add them outside
// of batches for compatibility sakes).
if (isBatch)
{
- names.addAll(cfDef.partitionKeys());
- names.addAll(cfDef.clusteringColumns());
+ defs.addAll(cfm.partitionKeyColumns());
+ defs.addAll(cfm.clusteringColumns());
}
- for (ColumnIdentifier id : columnsWithConditions)
- names.add(cfDef.get(id));
- selection = Selection.forColumns(names);
+ for (ColumnDefinition def : columnsWithConditions)
+ defs.add(def);
+ selection = Selection.forColumns(defs);
}
long now = System.currentTimeMillis();
Selection.ResultSetBuilder builder = selection.resultSetBuilder(now);
- SelectStatement.forSelection(cfDef, selection).processColumnFamily(key, cf, Collections.<ByteBuffer>emptyList(), now, builder);
+ SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, QueryOptions.DEFAULT, now, builder);
return builder.build();
}
@@ -674,15 +624,19 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (hasConditions())
throw new UnsupportedOperationException();
- for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
- mutation.apply();
+ for (IMutation mutation : getMutations(QueryOptions.DEFAULT, true, queryState.getTimestamp()))
+ {
+ // We don't use counters internally.
+ assert mutation instanceof Mutation;
+ ((Mutation) mutation).apply();
+ }
return null;
}
/**
* Convert statement into a list of mutations to apply on the server
*
- * @param variables value for prepared statement markers
+ * @param options value for prepared statement markers
* @param local if true, any requests (for collections) performed by getMutation should be done locally only.
* @param cl the consistency to use for the potential reads involved in generating the mutations (for lists set/delete operations)
* @param now the current timestamp in microseconds to use if no timestamp is user provided.
@@ -690,37 +644,36 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
* @return list of the mutations
* @throws InvalidRequestException on invalid requests
*/
- private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
+ private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now)
throws RequestExecutionException, RequestValidationException
{
- List<ByteBuffer> keys = buildPartitionKeyNames(variables);
- ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
+ List<ByteBuffer> keys = buildPartitionKeyNames(options);
+ Composite clusteringPrefix = createClusteringPrefix(options);
- UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, variables, local, cl, now);
+ UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, options, local, now);
Collection<IMutation> mutations = new ArrayList<IMutation>();
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(cfm, key);
- ColumnFamily cf = UnsortedColumns.factory.create(cfm);
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
addUpdateForKey(cf, key, clusteringPrefix, params);
- RowMutation rm = new RowMutation(cfm.ksName, key, cf);
- mutations.add(isCounter() ? new CounterMutation(rm, cl) : rm);
+ Mutation mut = new Mutation(cfm.ksName, key, cf);
+ mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut);
}
return mutations;
}
public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
- ColumnNameBuilder prefix,
- List<ByteBuffer> variables,
+ Composite prefix,
+ QueryOptions options,
boolean local,
- ConsistencyLevel cl,
long now)
throws RequestExecutionException, RequestValidationException
{
// Some lists operation requires reading
- Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, prefix, local, cl);
- return new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+ Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, options.getConsistency());
+ return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), rows);
}
public static abstract class Parsed extends CFStatement
@@ -749,16 +702,11 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
public ModificationStatement prepare(VariableSpecifications boundNames) throws InvalidRequestException
{
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
- CFDefinition cfDef = metadata.getCfDef();
-
- // The collected count in the beginning of preparation.
- // Will start at non-zero for statements nested inside a BatchStatement (the second and the further ones).
- int collected = boundNames.getCollectedCount();
Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily());
preparedAttributes.collectMarkerSpecification(boundNames);
- ModificationStatement stmt = prepareInternal(cfDef, boundNames, preparedAttributes);
+ ModificationStatement stmt = prepareInternal(metadata, boundNames, preparedAttributes);
if (ifNotExists || ifExists || !conditions.isEmpty())
{
@@ -766,7 +714,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
throw new InvalidRequestException("Conditional updates are not supported on counter tables");
if (attrs.timestamp != null)
- throw new InvalidRequestException("Cannot provide custom timestamp for conditional update");
+ throw new InvalidRequestException("Cannot provide custom timestamp for conditional updates");
if (ifNotExists)
{
@@ -786,32 +734,28 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
{
for (Pair<ColumnIdentifier, ColumnCondition.Raw> entry : conditions)
{
- CFDefinition.Name name = cfDef.get(entry.left);
- if (name == null)
+ ColumnDefinition def = metadata.getColumnDefinition(entry.left);
+ if (def == null)
throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
- ColumnCondition condition = entry.right.prepare(name);
+ ColumnCondition condition = entry.right.prepare(keyspace(), def);
condition.collectMarkerSpecification(boundNames);
- switch (name.kind)
+ switch (def.kind)
{
- case KEY_ALIAS:
- case COLUMN_ALIAS:
+ case PARTITION_KEY:
+ case CLUSTERING_COLUMN:
throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
- case VALUE_ALIAS:
- case COLUMN_METADATA:
- case STATIC:
+ default:
stmt.addCondition(condition);
break;
}
}
}
}
-
- stmt.boundTerms = boundNames.getCollectedCount() - collected;
return stmt;
}
- protected abstract ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException;
+ protected abstract ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException;
}
}