You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/09/04 21:14:21 UTC
[3/5] cassandra git commit: Allow range deletions in CQL
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e3727e3/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 5fa1842..9ddf7b8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -21,29 +21,30 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNull;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.MaterializedViewDefinition;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier.Raw;
import org.apache.cassandra.cql3.functions.Function;
-import org.apache.cassandra.cql3.restrictions.Restriction;
-import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction;
+import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.cql3.selection.Selection;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.BooleanType;
-import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -51,11 +52,10 @@ import org.apache.cassandra.triggers.TriggerExecutor;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.btree.BTreeSet;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
-import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
/*
* Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
@@ -66,75 +66,89 @@ public abstract class ModificationStatement implements CQLStatement
private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]", false);
- public static enum StatementType { INSERT, UPDATE, DELETE }
- public final StatementType type;
+ protected final StatementType type;
private final int boundTerms;
public final CFMetaData cfm;
- public final Attributes attrs;
+ private final Attributes attrs;
- protected final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<>();
- private final List<Operation> regularOperations = new ArrayList<>();
- private final List<Operation> staticOperations = new ArrayList<>();
+ private final StatementRestrictions restrictions;
- // TODO: If we had a builder for this statement, we could have updatedColumns/conditionColumns final and only have
- // updatedColumnsBuilder/conditionColumnsBuilder in the builder ...
- private PartitionColumns updatedColumns;
- private PartitionColumns.Builder updatedColumnsBuilder = PartitionColumns.builder();
- private PartitionColumns conditionColumns;
- private PartitionColumns.Builder conditionColumnsBuilder = PartitionColumns.builder();
+ private final Operations operations;
- // Separating normal and static conditions makes things somewhat easier
- private List<ColumnCondition> columnConditions;
- private List<ColumnCondition> staticConditions;
- private boolean ifNotExists;
- private boolean ifExists;
+ private final PartitionColumns updatedColumns;
- private boolean hasNoClusteringColumns = true;
+ private final Conditions conditions;
- private boolean setsStaticColumns;
- private boolean setsRegularColumns;
+ private final PartitionColumns conditionColumns;
- private final com.google.common.base.Function<ColumnCondition, ColumnDefinition> getColumnForCondition =
- new com.google.common.base.Function<ColumnCondition, ColumnDefinition>()
- {
- public ColumnDefinition apply(ColumnCondition cond)
- {
- return cond.column;
- }
- };
+ private final PartitionColumns requiresRead;
- public ModificationStatement(StatementType type, int boundTerms, CFMetaData cfm, Attributes attrs)
+ public ModificationStatement(StatementType type,
+ int boundTerms,
+ CFMetaData cfm,
+ Operations operations,
+ StatementRestrictions restrictions,
+ Conditions conditions,
+ Attributes attrs)
{
this.type = type;
this.boundTerms = boundTerms;
this.cfm = cfm;
+ this.restrictions = restrictions;
+ this.operations = operations;
+ this.conditions = conditions;
this.attrs = attrs;
- }
- public Iterable<Function> getFunctions()
- {
- Iterable<Function> functions = attrs.getFunctions();
+ if (!conditions.isEmpty())
+ {
+ checkFalse(cfm.isCounter(), "Conditional updates are not supported on counter tables");
+ checkFalse(attrs.isTimestampSet(), "Cannot provide custom timestamp for conditional updates");
+ }
- for (Restriction restriction : processedKeys.values())
- functions = Iterables.concat(functions, restriction.getFunctions());
+ PartitionColumns.Builder conditionColumnsBuilder = PartitionColumns.builder();
+ Iterable<ColumnDefinition> columns = conditions.getColumns();
+ if (columns != null)
+ conditionColumnsBuilder.addAll(columns);
- for (Operation operation : allOperations())
- functions = Iterables.concat(functions, operation.getFunctions());
+ PartitionColumns.Builder updatedColumnsBuilder = PartitionColumns.builder();
+ PartitionColumns.Builder requiresReadBuilder = PartitionColumns.builder();
+ for (Operation operation : operations)
+ {
+ updatedColumnsBuilder.add(operation.column);
+ // If the operation requires a read-before-write and we're doing a conditional read, we want to read
+ // the affected column as part of the read-for-conditions paxos phase (see #7499).
+ if (operation.requiresRead())
+ {
+ conditionColumnsBuilder.add(operation.column);
+ requiresReadBuilder.add(operation.column);
+ }
+ }
- for (ColumnCondition condition : allConditions())
- functions = Iterables.concat(functions, condition.getFunctions());
+ PartitionColumns modifiedColumns = updatedColumnsBuilder.build();
+ // Compact tables have not row marker. So if we don't actually update any particular column,
+ // this means that we're only updating the PK, which we allow if only those were declared in
+ // the definition. In that case however, we do went to write the compactValueColumn (since again
+ // we can't use a "row marker") so add it automatically.
+ if (cfm.isCompactTable() && modifiedColumns.isEmpty() && updatesRegularRows())
+ modifiedColumns = cfm.partitionColumns();
- return functions;
+ this.updatedColumns = modifiedColumns;
+ this.conditionColumns = conditionColumnsBuilder.build();
+ this.requiresRead = requiresReadBuilder.build();
}
- public boolean hasNoClusteringColumns()
+ public Iterable<Function> getFunctions()
{
- return hasNoClusteringColumns;
+ return Iterables.concat(attrs.getFunctions(),
+ restrictions.getFunctions(),
+ operations.getFunctions(),
+ conditions.getFunctions());
}
- public abstract boolean requireFullClusteringKey();
- public abstract void addUpdateForKey(PartitionUpdate update, CBuilder clusteringBuilder, UpdateParameters params) throws InvalidRequestException;
+ public abstract void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params);
+
+ public abstract void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params);
public int getBoundTerms()
{
@@ -204,37 +218,10 @@ public abstract class ModificationStatement implements CQLStatement
public void validate(ClientState state) throws InvalidRequestException
{
- if (hasConditions() && attrs.isTimestampSet())
- throw new InvalidRequestException("Cannot provide custom timestamp for conditional updates");
-
- if (isCounter() && attrs.isTimestampSet())
- throw new InvalidRequestException("Cannot provide custom timestamp for counter updates");
-
- if (isCounter() && attrs.isTimeToLiveSet())
- throw new InvalidRequestException("Cannot provide custom TTL for counter updates");
-
- if (isMaterializedView())
- throw new InvalidRequestException("Cannot directly modify a materialized view");
- }
-
- public void addOperation(Operation op)
- {
- updatedColumnsBuilder.add(op.column);
- // If the operation requires a read-before-write and we're doing a conditional read, we want to read
- // the affected column as part of the read-for-conditions paxos phase (see #7499).
- if (op.requiresRead())
- conditionColumnsBuilder.add(op.column);
-
- if (op.column.isStatic())
- {
- setsStaticColumns = true;
- staticOperations.add(op);
- }
- else
- {
- setsRegularColumns = true;
- regularOperations.add(op);
- }
+ checkFalse(hasConditions() && attrs.isTimestampSet(), "Cannot provide custom timestamp for conditional updates");
+ checkFalse(isCounter() && attrs.isTimestampSet(), "Cannot provide custom timestamp for counter updates");
+ checkFalse(isCounter() && attrs.isTimeToLiveSet(), "Cannot provide custom TTL for counter updates");
+ checkFalse(isMaterializedView(), "Cannot directly modify a materialized view");
}
public PartitionColumns updatedColumns()
@@ -254,249 +241,77 @@ public abstract class ModificationStatement implements CQLStatement
// columns is if we set some static columns, and in that case no clustering
// columns should be given. So in practice, it's enough to check if we have
// either the table has no clustering or if it has at least one of them set.
- return cfm.clusteringColumns().isEmpty() || !hasNoClusteringColumns;
+ return cfm.clusteringColumns().isEmpty() || restrictions.hasClusteringColumnsRestriction();
}
public boolean updatesStaticRow()
{
- return !staticOperations.isEmpty();
- }
-
- private void finishPreparation()
- {
- updatedColumns = updatedColumnsBuilder.build();
- // Compact tables have not row marker. So if we don't actually update any particular column,
- // this means that we're only updating the PK, which we allow if only those were declared in
- // the definition. In that case however, we do went to write the compactValueColumn (since again
- // we can't use a "row marker") so add it automatically.
- if (cfm.isCompactTable() && updatedColumns.isEmpty() && updatesRegularRows())
- updatedColumns = cfm.partitionColumns();
-
- conditionColumns = conditionColumnsBuilder.build();
+ return operations.appliesToStaticColumns();
}
public List<Operation> getRegularOperations()
{
- return regularOperations;
+ return operations.regularOperations();
}
public List<Operation> getStaticOperations()
{
- return staticOperations;
+ return operations.staticOperations();
}
public Iterable<Operation> allOperations()
{
- return Iterables.concat(staticOperations, regularOperations);
+ return operations;
}
public Iterable<ColumnDefinition> getColumnsWithConditions()
{
- if (ifNotExists || ifExists)
- return null;
-
- return Iterables.concat(columnConditions == null ? Collections.<ColumnDefinition>emptyList() : Iterables.transform(columnConditions, getColumnForCondition),
- staticConditions == null ? Collections.<ColumnDefinition>emptyList() : Iterables.transform(staticConditions, getColumnForCondition));
- }
-
- public Iterable<ColumnCondition> allConditions()
- {
- if (staticConditions == null)
- return columnConditions == null ? Collections.<ColumnCondition>emptySet(): columnConditions;
- if (columnConditions == null)
- return staticConditions;
- return Iterables.concat(staticConditions, columnConditions);
- }
-
- public void addCondition(ColumnCondition cond)
- {
- conditionColumnsBuilder.add(cond.column);
-
- List<ColumnCondition> conds = null;
- if (cond.column.isStatic())
- {
- setsStaticColumns = true;
- if (staticConditions == null)
- staticConditions = new ArrayList<ColumnCondition>();
- conds = staticConditions;
- }
- else
- {
- setsRegularColumns = true;
- if (columnConditions == null)
- columnConditions = new ArrayList<ColumnCondition>();
- conds = columnConditions;
- }
- conds.add(cond);
- }
-
- public void setIfNotExistCondition()
- {
- ifNotExists = true;
+ return conditions.getColumns();
}
public boolean hasIfNotExistCondition()
{
- return ifNotExists;
- }
-
- public void setIfExistCondition()
- {
- ifExists = true;
+ return conditions.isIfNotExists();
}
public boolean hasIfExistCondition()
{
- return ifExists;
- }
-
- private void addKeyValues(ColumnDefinition def, Restriction values) throws InvalidRequestException
- {
- if (def.kind == ColumnDefinition.Kind.CLUSTERING)
- hasNoClusteringColumns = false;
- if (processedKeys.put(def.name, values) != null)
- throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", def.name));
- }
-
- public void addKeyValue(ColumnDefinition def, Term value) throws InvalidRequestException
- {
- addKeyValues(def, new SingleColumnRestriction.EQRestriction(def, value));
- }
-
- public void processWhereClause(List<Relation> whereClause, VariableSpecifications names) throws InvalidRequestException
- {
- for (Relation relation : whereClause)
- {
- if (relation.isMultiColumn())
- {
- throw new InvalidRequestException(
- String.format("Multi-column relations cannot be used in WHERE clauses for UPDATE and DELETE statements: %s", relation));
- }
- SingleColumnRelation rel = (SingleColumnRelation) relation;
-
- if (rel.onToken())
- throw new InvalidRequestException(String.format("The token function cannot be used in WHERE clauses for UPDATE and DELETE statements: %s", relation));
-
- ColumnIdentifier id = rel.getEntity().prepare(cfm);
- ColumnDefinition def = cfm.getColumnDefinition(id);
- if (def == null)
- throw new InvalidRequestException(String.format("Unknown key identifier %s", id));
-
- switch (def.kind)
- {
- case PARTITION_KEY:
- case CLUSTERING:
- Restriction restriction;
-
- if (rel.isEQ() || (def.isPartitionKey() && rel.isIN()))
- {
- restriction = rel.toRestriction(cfm, names);
- }
- else
- {
- throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), def.name));
- }
-
- addKeyValues(def, restriction);
- break;
- default:
- throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", def.name));
- }
- }
+ return conditions.isIfExists();
}
public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options)
throws InvalidRequestException
{
- MultiCBuilder keyBuilder = MultiCBuilder.create(cfm.getKeyValidatorAsClusteringComparator());
- for (ColumnDefinition def : cfm.partitionKeyColumns())
- {
- Restriction r = checkNotNull(processedKeys.get(def.name), "Missing mandatory PRIMARY KEY part %s", def.name);
- r.appendTo(keyBuilder, options);
- }
-
- NavigableSet<Clustering> clusterings = keyBuilder.build();
- List<ByteBuffer> keys = new ArrayList<ByteBuffer>(clusterings.size());
- for (Clustering clustering : clusterings)
- {
- ByteBuffer key = CFMetaData.serializePartitionKey(clustering);
- ThriftValidation.validateKey(cfm, key);
- keys.add(key);
- }
- return keys;
+ return restrictions.getPartitionKeys(options);
}
- public CBuilder createClustering(QueryOptions options)
+ public NavigableSet<Clustering> createClustering(QueryOptions options)
throws InvalidRequestException
{
- // If the only updated/deleted columns are static, then we don't need clustering columns.
- // And in fact, unless it is an INSERT, we reject if clustering colums are provided as that
- // suggest something unintended. For instance, given:
- // CREATE TABLE t (k int, v int, s int static, PRIMARY KEY (k, v))
- // it can make sense to do:
- // INSERT INTO t(k, v, s) VALUES (0, 1, 2)
- // but both
- // UPDATE t SET s = 3 WHERE k = 0 AND v = 1
- // DELETE v FROM t WHERE k = 0 AND v = 1
- // sounds like you don't really understand what your are doing.
- if (setsStaticColumns && !setsRegularColumns)
- {
- // If we set no non-static columns, then it's fine not to have clustering columns
- if (hasNoClusteringColumns)
- return CBuilder.STATIC_BUILDER;
-
- // If we do have clustering columns however, then either it's an INSERT and the query is valid
- // but we still need to build a proper prefix, or it's not an INSERT, and then we want to reject
- // (see above)
- if (type != StatementType.INSERT)
- {
- for (ColumnDefinition def : cfm.clusteringColumns())
- if (processedKeys.get(def.name) != null)
- throw new InvalidRequestException(String.format("Invalid restriction on clustering column %s since the %s statement modifies only static columns", def.name, type));
- // we should get there as it contradicts hasNoClusteringColumns == false
- throw new AssertionError();
- }
- }
+ if (appliesOnlyToStaticColumns() && !restrictions.hasClusteringColumnsRestriction())
+ return FBUtilities.singleton(CBuilder.STATIC_BUILDER.build(), cfm.comparator);
- return createClusteringInternal(options);
+ return restrictions.getClusteringColumns(options);
}
- private CBuilder createClusteringInternal(QueryOptions options)
- throws InvalidRequestException
+ /**
+ * Checks that the modification only apply to static columns.
+ * @return <code>true</code> if the modification only apply to static columns, <code>false</code> otherwise.
+ */
+ private boolean appliesOnlyToStaticColumns()
{
- CBuilder builder = CBuilder.create(cfm.comparator);
- MultiCBuilder multiBuilder = MultiCBuilder.wrap(builder);
-
- ColumnDefinition firstEmptyKey = null;
- for (ColumnDefinition def : cfm.clusteringColumns())
- {
- Restriction r = processedKeys.get(def.name);
- if (r == null)
- {
- firstEmptyKey = def;
- checkFalse(requireFullClusteringKey() && !cfm.isDense() && cfm.isCompound(),
- "Missing mandatory PRIMARY KEY part %s", def.name);
- }
- else if (firstEmptyKey != null)
- {
- throw invalidRequest("Missing PRIMARY KEY part %s since %s is set", firstEmptyKey.name, def.name);
- }
- else
- {
- r.appendTo(multiBuilder, options);
- }
- }
- return builder;
+ return appliesOnlyToStaticColumns(operations, conditions);
}
- protected ColumnDefinition getFirstEmptyKey()
+ /**
+ * Checks that the specified operations and conditions only apply to static columns.
+ * @return <code>true</code> if the specified operations and conditions only apply to static columns,
+ * <code>false</code> otherwise.
+ */
+ public static boolean appliesOnlyToStaticColumns(Operations operation, Conditions conditions)
{
- for (ColumnDefinition def : cfm.clusteringColumns())
- {
- if (processedKeys.get(def.name) == null)
- return def;
- }
- return null;
+ return !operation.appliesToRegularColumns() && !conditions.appliesToRegularColumns()
+ && (operation.appliesToStaticColumns() || conditions.appliesToStaticColumns());
}
public boolean requiresRead()
@@ -509,8 +324,11 @@ public abstract class ModificationStatement implements CQLStatement
return false;
}
- protected Map<DecoratedKey, Partition> readRequiredLists(Collection<ByteBuffer> partitionKeys, CBuilder cbuilder, boolean local, ConsistencyLevel cl)
- throws RequestExecutionException, RequestValidationException
+ private Map<DecoratedKey, Partition> readRequiredLists(Collection<ByteBuffer> partitionKeys,
+ ClusteringIndexFilter filter,
+ DataLimits limits,
+ boolean local,
+ ConsistencyLevel cl)
{
if (!requiresRead())
return null;
@@ -524,27 +342,16 @@ public abstract class ModificationStatement implements CQLStatement
throw new InvalidRequestException(String.format("Write operation require a read but consistency %s is not supported on reads", cl));
}
- // TODO: no point in recomputing that every time. Should move to preparation phase.
- PartitionColumns.Builder builder = PartitionColumns.builder();
- for (Operation op : allOperations())
- if (op.requiresRead())
- builder.add(op.column);
-
- PartitionColumns toRead = builder.build();
-
- NavigableSet<Clustering> clusterings = BTreeSet.of(cfm.comparator, cbuilder.build());
List<SinglePartitionReadCommand<?>> commands = new ArrayList<>(partitionKeys.size());
int nowInSec = FBUtilities.nowInSeconds();
for (ByteBuffer key : partitionKeys)
- commands.add(new SinglePartitionNamesCommand(cfm,
- nowInSec,
- ColumnFilter.selection(toRead),
- RowFilter.NONE,
- DataLimits.NONE,
- key,
- new ClusteringIndexNamesFilter(clusterings, false)));
-
- Map<DecoratedKey, Partition> map = new HashMap<>();
+ commands.add(SinglePartitionReadCommand.create(cfm,
+ nowInSec,
+ ColumnFilter.selection(this.requiresRead),
+ RowFilter.NONE,
+ limits,
+ cfm.decorateKey(key),
+ filter));
SinglePartitionReadCommand.Group group = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
@@ -555,18 +362,16 @@ public abstract class ModificationStatement implements CQLStatement
return asMaterializedMap(iter);
}
}
- else
+
+ try (PartitionIterator iter = group.execute(cl, null))
{
- try (PartitionIterator iter = group.execute(cl, null))
- {
- return asMaterializedMap(iter);
- }
+ return asMaterializedMap(iter);
}
}
private Map<DecoratedKey, Partition> asMaterializedMap(PartitionIterator iterator)
{
- Map<DecoratedKey, Partition> map = new HashMap();
+ Map<DecoratedKey, Partition> map = new HashMap<>();
while (iterator.hasNext())
{
try (RowIterator partition = iterator.next())
@@ -579,10 +384,7 @@ public abstract class ModificationStatement implements CQLStatement
public boolean hasConditions()
{
- return ifNotExists
- || ifExists
- || (columnConditions != null && !columnConditions.isEmpty())
- || (staticConditions != null && !staticConditions.isEmpty());
+ return !conditions.isEmpty();
}
public ResultMessage execute(QueryState queryState, QueryOptions options)
@@ -636,39 +438,31 @@ public abstract class ModificationStatement implements CQLStatement
{
List<ByteBuffer> keys = buildPartitionKeyNames(options);
// We don't support IN for CAS operation so far
- if (keys.size() > 1)
- throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
+ checkFalse(keys.size() > 1,
+ "IN on the partition key is not supported with conditional %s",
+ type.isUpdate()? "updates" : "deletions");
DecoratedKey key = cfm.decorateKey(keys.get(0));
long now = options.getTimestamp(queryState);
- CBuilder cbuilder = createClustering(options);
+ SortedSet<Clustering> clusterings = createClustering(options);
+
+ checkFalse(clusterings.size() > 1,
+ "IN on the clustering key columns is not supported with conditional %s",
+ type.isUpdate()? "updates" : "deletions");
+
+ Clustering clustering = Iterables.getOnlyElement(clusterings);
CQL3CasRequest request = new CQL3CasRequest(cfm, key, false, conditionColumns(), updatesRegularRows(), updatesStaticRow());
- addConditions(cbuilder.build(), request, options);
- request.addRowUpdate(cbuilder, this, options, now);
+
+ addConditions(clustering, request, options);
+ request.addRowUpdate(clustering, this, options, now);
+
return request;
}
public void addConditions(Clustering clustering, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException
{
- if (ifNotExists)
- {
- // If we use ifNotExists, if the statement applies to any non static columns, then the condition is on the row of the non-static
- // columns and the prefix should be the clustering. But if only static columns are set, then the ifNotExists apply to the existence
- // of any static columns and we should use the prefix for the "static part" of the partition.
- request.addNotExist(clustering);
- }
- else if (ifExists)
- {
- request.addExist(clustering);
- }
- else
- {
- if (columnConditions != null)
- request.addConditions(clustering, columnConditions, options);
- if (staticConditions != null)
- request.addConditions(Clustering.STATIC_CLUSTERING, staticConditions, options);
- }
+ conditions.addConditionsTo(request, clustering, options);
}
private ResultSet buildCasResultSet(RowIterator partition, QueryOptions options) throws InvalidRequestException
@@ -778,9 +572,8 @@ public abstract class ModificationStatement implements CQLStatement
static RowIterator casInternal(CQL3CasRequest request, QueryState state)
{
UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp());
- CFMetaData metadata = Schema.instance.getCFMetaData(request.cfm.ksName, request.cfm.cfName);
- SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
+ SinglePartitionReadCommand<?> readCommand = request.readCommand(FBUtilities.nowInSeconds());
FilteredPartition current;
try (ReadOrderGroup orderGroup = readCommand.startOrderGroup(); PartitionIterator iter = readCommand.executeInternal(orderGroup))
{
@@ -806,55 +599,141 @@ public abstract class ModificationStatement implements CQLStatement
* @param now the current timestamp in microseconds to use if no timestamp is user provided.
*
* @return list of the mutations
- * @throws InvalidRequestException on invalid requests
*/
private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now)
- throws RequestExecutionException, RequestValidationException
+ {
+ UpdatesCollector collector = new UpdatesCollector(updatedColumns, 1);
+ addUpdates(collector, options, local, now);
+ return collector.toMutations();
+ }
+
+ final void addUpdates(UpdatesCollector collector,
+ QueryOptions options,
+ boolean local,
+ long now)
{
List<ByteBuffer> keys = buildPartitionKeyNames(options);
- CBuilder clustering = createClustering(options);
- UpdateParameters params = makeUpdateParameters(keys, clustering, options, local, now);
+ if (type.allowClusteringColumnSlices()
+ && restrictions.hasClusteringColumnsRestriction()
+ && restrictions.isColumnRange())
+ {
+ Slices slices = createSlice(options);
+
+ // If all the ranges were invalid we do not need to do anything.
+ if (slices.isEmpty())
+ return;
+
+ UpdateParameters params = makeUpdateParameters(keys,
+ new ClusteringIndexSliceFilter(slices, false),
+ options,
+ DataLimits.NONE,
+ local,
+ now);
+ for (ByteBuffer key : keys)
+ {
+ ThriftValidation.validateKey(cfm, key);
+ DecoratedKey dk = cfm.decorateKey(key);
+
+ PartitionUpdate upd = collector.getPartitionUpdate(cfm, dk, options.getConsistency());
- Collection<IMutation> mutations = new ArrayList<IMutation>(keys.size());
- for (ByteBuffer key: keys)
+ for (Slice slice : slices)
+ addUpdateForKey(upd, slice, params);
+ }
+ }
+ else
{
- ThriftValidation.validateKey(cfm, key);
- PartitionUpdate upd = new PartitionUpdate(cfm, key, updatedColumns(), 1);
- addUpdateForKey(upd, clustering, params);
- Mutation mut = new Mutation(upd);
+ NavigableSet<Clustering> clusterings = createClustering(options);
+
+ UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, now);
+
+ for (ByteBuffer key : keys)
+ {
+ ThriftValidation.validateKey(cfm, key);
+ DecoratedKey dk = cfm.decorateKey(key);
+
+ PartitionUpdate upd = collector.getPartitionUpdate(cfm, dk, options.getConsistency());
- mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut);
+ if (clusterings.isEmpty())
+ {
+ addUpdateForKey(upd, Clustering.EMPTY, params);
+ }
+ else
+ {
+ for (Clustering clustering : clusterings)
+ addUpdateForKey(upd, clustering, params);
+ }
+ }
}
- return mutations;
}
- public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
- CBuilder clustering,
- QueryOptions options,
- boolean local,
- long now)
- throws RequestExecutionException, RequestValidationException
+ private Slices createSlice(QueryOptions options)
+ {
+ SortedSet<Slice.Bound> startBounds = restrictions.getClusteringColumnsBounds(Bound.START, options);
+ SortedSet<Slice.Bound> endBounds = restrictions.getClusteringColumnsBounds(Bound.END, options);
+
+ return toSlices(startBounds, endBounds);
+ }
+
+ private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
+ NavigableSet<Clustering> clusterings,
+ QueryOptions options,
+ boolean local,
+ long now)
+ {
+ if (clusterings.contains(Clustering.STATIC_CLUSTERING))
+ return makeUpdateParameters(keys,
+ new ClusteringIndexSliceFilter(Slices.ALL, false),
+ options,
+ DataLimits.cqlLimits(1),
+ local,
+ now);
+
+ return makeUpdateParameters(keys,
+ new ClusteringIndexNamesFilter(clusterings, false),
+ options,
+ DataLimits.NONE,
+ local,
+ now);
+ }
+
+ private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
+ ClusteringIndexFilter filter,
+ QueryOptions options,
+ DataLimits limits,
+ boolean local,
+ long now)
{
// Some lists operation requires reading
- Map<DecoratedKey, Partition> lists = readRequiredLists(keys, clustering, local, options.getConsistency());
+ Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, limits, local, options.getConsistency());
return new UpdateParameters(cfm, updatedColumns(), options, getTimestamp(now, options), getTimeToLive(options), lists, true);
}
- /**
- * If there are conditions on the statement, this is called after the where clause and conditions have been
- * processed to check that they are compatible.
- * @throws InvalidRequestException
- */
- protected void validateWhereClauseForConditions() throws InvalidRequestException
+ private Slices toSlices(SortedSet<Slice.Bound> startBounds, SortedSet<Slice.Bound> endBounds)
{
- // no-op by default
+ assert startBounds.size() == endBounds.size();
+
+ Slices.Builder builder = new Slices.Builder(cfm.comparator);
+
+ Iterator<Slice.Bound> starts = startBounds.iterator();
+ Iterator<Slice.Bound> ends = endBounds.iterator();
+
+ while (starts.hasNext())
+ {
+ Slice slice = Slice.make(starts.next(), ends.next());
+ if (!slice.isEmpty(cfm.comparator))
+ {
+ builder.add(slice);
+ }
+ }
+
+ return builder.build();
}
public static abstract class Parsed extends CFStatement
{
- protected final Attributes.Raw attrs;
- protected final List<Pair<ColumnIdentifier.Raw, ColumnCondition.Raw>> conditions;
+ private final Attributes.Raw attrs;
+ private final List<Pair<ColumnIdentifier.Raw, ColumnCondition.Raw>> conditions;
private final boolean ifNotExists;
private final boolean ifExists;
@@ -867,7 +746,7 @@ public abstract class ModificationStatement implements CQLStatement
this.ifExists = ifExists;
}
- public ParsedStatement.Prepared prepare() throws InvalidRequestException
+ public ParsedStatement.Prepared prepare()
{
VariableSpecifications boundNames = getBoundVariables();
ModificationStatement statement = prepare(boundNames);
@@ -875,68 +754,118 @@ public abstract class ModificationStatement implements CQLStatement
return new ParsedStatement.Prepared(statement, boundNames, boundNames.getPartitionKeyBindIndexes(cfm));
}
- public ModificationStatement prepare(VariableSpecifications boundNames) throws InvalidRequestException
+ public ModificationStatement prepare(VariableSpecifications boundNames)
{
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily());
preparedAttributes.collectMarkerSpecification(boundNames);
- ModificationStatement stmt = prepareInternal(metadata, boundNames, preparedAttributes);
+ Conditions preparedConditions = prepareConditions(metadata, boundNames);
- if (ifNotExists || ifExists || !conditions.isEmpty())
+ return prepareInternal(metadata,
+ boundNames,
+ preparedConditions,
+ preparedAttributes);
+ }
+
+ /**
+ * Returns the column conditions.
+ *
+ * @param metadata the column family meta data
+ * @param boundNames the bound names
+ * @return the column conditions.
+ */
+ private Conditions prepareConditions(CFMetaData metadata, VariableSpecifications boundNames)
+ {
+ // To have both 'IF EXISTS'/'IF NOT EXISTS' and some other conditions doesn't make sense.
+ // So far this is enforced by the parser, but let's assert it for sanity if ever the parse changes.
+ if (ifExists)
{
- if (stmt.isCounter())
- throw new InvalidRequestException("Conditional updates are not supported on counter tables");
+ assert conditions.isEmpty();
+ assert !ifNotExists;
+ return Conditions.IF_EXISTS_CONDITION;
+ }
- if (attrs.timestamp != null)
- throw new InvalidRequestException("Cannot provide custom timestamp for conditional updates");
+ if (ifNotExists)
+ {
+ assert conditions.isEmpty();
+ assert !ifExists;
+ return Conditions.IF_NOT_EXISTS_CONDITION;
+ }
- if (ifNotExists)
- {
- // To have both 'IF NOT EXISTS' and some other conditions doesn't make sense.
- // So far this is enforced by the parser, but let's assert it for sanity if ever the parse changes.
- assert conditions.isEmpty();
- assert !ifExists;
- stmt.setIfNotExistCondition();
- }
- else if (ifExists)
- {
- assert conditions.isEmpty();
- assert !ifNotExists;
- stmt.setIfExistCondition();
- }
- else
- {
- for (Pair<ColumnIdentifier.Raw, ColumnCondition.Raw> entry : conditions)
- {
- ColumnIdentifier id = entry.left.prepare(metadata);
- ColumnDefinition def = metadata.getColumnDefinition(id);
- if (def == null)
- throw new InvalidRequestException(String.format("Unknown identifier %s", id));
-
- ColumnCondition condition = entry.right.prepare(keyspace(), def);
- condition.collectMarkerSpecification(boundNames);
-
- switch (def.kind)
- {
- case PARTITION_KEY:
- case CLUSTERING:
- throw new InvalidRequestException(String.format("PRIMARY KEY column '%s' cannot have IF conditions", id));
- default:
- stmt.addCondition(condition);
- break;
- }
- }
- }
+ if (conditions.isEmpty())
+ return Conditions.EMPTY_CONDITION;
- stmt.validateWhereClauseForConditions();
+ return prepareColumnConditions(metadata, boundNames);
+ }
+
+ /**
+ * Returns the column conditions.
+ *
+ * @param metadata the column family meta data
+ * @param boundNames the bound names
+ * @return the column conditions.
+ */
+ private ColumnConditions prepareColumnConditions(CFMetaData metadata, VariableSpecifications boundNames)
+ {
+ checkNull(attrs.timestamp, "Cannot provide custom timestamp for conditional updates");
+
+ ColumnConditions.Builder builder = ColumnConditions.newBuilder();
+
+ for (Pair<ColumnIdentifier.Raw, ColumnCondition.Raw> entry : conditions)
+ {
+ ColumnIdentifier id = entry.left.prepare(metadata);
+ ColumnDefinition def = metadata.getColumnDefinition(id);
+ checkNotNull(metadata.getColumnDefinition(id), "Unknown identifier %s in IF conditions", id);
+
+ ColumnCondition condition = entry.right.prepare(keyspace(), def);
+ condition.collectMarkerSpecification(boundNames);
+
+ checkFalse(def.isPrimaryKeyColumn(), "PRIMARY KEY column '%s' cannot have IF conditions", id);
+ builder.add(condition);
}
+ return builder.build();
+ }
- stmt.finishPreparation();
- return stmt;
+ protected abstract ModificationStatement prepareInternal(CFMetaData cfm,
+ VariableSpecifications boundNames,
+ Conditions conditions,
+ Attributes attrs);
+
+ /**
+ * Creates the restrictions.
+ *
+ * @param type the statement type
+ * @param cfm the column family meta data
+ * @param boundNames the bound names
+ * @param operations the column operations
+ * @param relations the where relations
+ * @param conditions the conditions
+ * @return the restrictions
+ */
+ protected static StatementRestrictions newRestrictions(StatementType type,
+ CFMetaData cfm,
+ VariableSpecifications boundNames,
+ Operations operations,
+ List<Relation> relations,
+ Conditions conditions)
+ {
+ boolean applyOnlyToStaticColumns = appliesOnlyToStaticColumns(operations, conditions);
+ return new StatementRestrictions(type, cfm, relations, boundNames, applyOnlyToStaticColumns, false, false);
}
- protected abstract ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException;
+ /**
+ * Retrieves the <code>ColumnDefinition</code> corresponding to the specified raw <code>ColumnIdentifier</code>.
+ *
+ * @param cfm the column family meta data
+ * @param rawId the raw <code>ColumnIdentifier</code>
+ * @return the <code>ColumnDefinition</code> corresponding to the specified raw <code>ColumnIdentifier</code>
+ */
+ protected static ColumnDefinition getColumnDefinition(CFMetaData cfm, Raw rawId)
+ {
+ ColumnIdentifier id = rawId.prepare(cfm);
+ return checkNotNull(cfm.getColumnDefinition(id), "Unknown identifier %s", id);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e3727e3/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 5d5dfea..2aac6ab 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -23,9 +23,10 @@ import java.util.*;
import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
+
import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
@@ -57,7 +58,6 @@ import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-
import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
@@ -156,7 +156,7 @@ public class SelectStatement implements CQLStatement
0,
defaultParameters,
selection,
- StatementRestrictions.empty(cfm),
+ StatementRestrictions.empty(StatementType.SELECT, cfm),
false,
null,
null);
@@ -790,7 +790,8 @@ public class SelectStatement implements CQLStatement
{
try
{
- return new StatementRestrictions(cfm,
+ return new StatementRestrictions(StatementType.SELECT,
+ cfm,
whereClause,
boundNames,
selection.containsOnlyStaticColumns(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e3727e3/src/java/org/apache/cassandra/cql3/statements/StatementType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/StatementType.java b/src/java/org/apache/cassandra/cql3/statements/StatementType.java
new file mode 100644
index 0000000..d399931
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/StatementType.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+public enum StatementType
+{
+ INSERT
+ {
+ @Override
+ public boolean allowClusteringColumnSlices()
+ {
+ return false;
+ }
+ },
+ UPDATE
+ {
+
+ @Override
+ public boolean allowClusteringColumnSlices()
+ {
+ return false;
+ }
+ },
+ DELETE
+ {
+ },
+ SELECT
+ {
+ @Override
+ public boolean allowPartitionKeyRanges()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean allowNonPrimaryKeyInWhereClause()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean allowUseOfSecondaryIndices()
+ {
+ return true;
+ }
+ };
+
+ /**
+ * Checks if this type is an insert.
+ * @return <code>true</code> if this type is an insert, <code>false</code> otherwise.
+ */
+ public boolean isInsert()
+ {
+ return this == INSERT;
+ }
+
+ /**
+ * Checks if this type is an update.
+ * @return <code>true</code> if this type is an update, <code>false</code> otherwise.
+ */
+ public boolean isUpdate()
+ {
+ return this == UPDATE;
+ }
+
+ /**
+ * Checks if this type is a delete.
+ * @return <code>true</code> if this type is a delete, <code>false</code> otherwise.
+ */
+ public boolean isDelete()
+ {
+ return this == DELETE;
+ }
+
+ /**
+ * Checks if this type is a select.
+ * @return <code>true</code> if this type is a select, <code>false</code> otherwise.
+ */
+ public boolean isSelect()
+ {
+ return this == SELECT;
+ }
+
+ /**
+ * Checks this statement allow the where clause to contains missing partition key components or token relation.
+ * @return <code>true</code> if this statement allow the where clause to contains missing partition key components
+ * or token relation, <code>false</code> otherwise.
+ */
+ public boolean allowPartitionKeyRanges()
+ {
+ return false;
+ }
+
+ /**
+ * Checks this type of statement allow the where clause to contains clustering column slices.
+ * @return <code>true</code> if this type of statement allow the where clause to contains clustering column slices,
+ * <code>false</code> otherwise.
+ */
+ public boolean allowClusteringColumnSlices()
+ {
+ return true;
+ }
+
+ /**
+ * Checks if this type of statement allow non primary key in the where clause.
+ * @return <code>true</code> if this type of statement allow non primary key in the where clause,
+ * <code>false</code> otherwise.
+ */
+ public boolean allowNonPrimaryKeyInWhereClause()
+ {
+ return false;
+ }
+
+ /**
+ * Checks if this type of statement allow the use of secondary indices.
+ * @return <code>true</code> if this type of statement allow the use of secondary indices,
+ * <code>false</code> otherwise.
+ */
+ public boolean allowUseOfSecondaryIndices()
+ {
+ return false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e3727e3/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index e19deaa..8fa16e1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.cql3.statements;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -24,14 +25,18 @@ import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.CBuilder;
+import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.CompactTables;
+import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkContainsNoDuplicates;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
+
/**
* An <code>UPDATE</code> statement parsed from a CQL query statement.
*
@@ -40,9 +45,15 @@ public class UpdateStatement extends ModificationStatement
{
private static final Constants.Value EMPTY = new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER);
- private UpdateStatement(StatementType type, int boundTerms, CFMetaData cfm, Attributes attrs)
+ private UpdateStatement(StatementType type,
+ int boundTerms,
+ CFMetaData cfm,
+ Operations operations,
+ StatementRestrictions restrictions,
+ Conditions conditions,
+ Attributes attrs)
{
- super(type, boundTerms, cfm, attrs);
+ super(type, boundTerms, cfm, operations, restrictions, conditions, attrs);
}
public boolean requireFullClusteringKey()
@@ -50,22 +61,22 @@ public class UpdateStatement extends ModificationStatement
return true;
}
- public void addUpdateForKey(PartitionUpdate update, CBuilder cbuilder, UpdateParameters params)
- throws InvalidRequestException
+ @Override
+ public void addUpdateForKey(PartitionUpdate update, Clustering clustering, UpdateParameters params)
{
if (updatesRegularRows())
{
- params.newRow(cbuilder.build());
+ params.newRow(clustering);
// We update the row timestamp (ex-row marker) only on INSERT (#6782)
// Further, COMPACT tables semantic differs from "CQL3" ones in that a row exists only if it has
// a non-null column, so we don't want to set the row timestamp for them.
- if (type == StatementType.INSERT && cfm.isCQLTable())
+ if (type.isInsert() && cfm.isCQLTable())
params.addPrimaryKeyLivenessInfo();
List<Operation> updates = getRegularOperations();
- // For compact tablw, when we translate it to thrift, we don't have a row marker. So we don't accept an insert/update
+ // For compact table, when we translate it to thrift, we don't have a row marker. So we don't accept an insert/update
// that only sets the PK unless the is no declared non-PK columns (in the latter we just set the value empty).
// For a dense layout, when we translate it to thrift, we don't have a row marker. So we don't accept an insert/update
@@ -73,10 +84,11 @@ public class UpdateStatement extends ModificationStatement
// value is of type "EmptyType").
if (cfm.isCompactTable() && updates.isEmpty())
{
- if (CompactTables.hasEmptyCompactValue(cfm))
- updates = Collections.<Operation>singletonList(new Constants.Setter(cfm.compactValueColumn(), EMPTY));
- else
- throw new InvalidRequestException(String.format("Column %s is mandatory for this COMPACT STORAGE table", cfm.compactValueColumn().name));
+ checkTrue(CompactTables.hasEmptyCompactValue(cfm),
+ "Column %s is mandatory for this COMPACT STORAGE table",
+ cfm.compactValueColumn().name);
+
+ updates = Collections.<Operation>singletonList(new Constants.Setter(cfm.compactValueColumn(), EMPTY));
}
for (Operation op : updates)
@@ -96,6 +108,12 @@ public class UpdateStatement extends ModificationStatement
params.validateIndexedColumns(update);
}
+ @Override
+ public void addUpdateForKey(PartitionUpdate update, Slice slice, UpdateParameters params)
+ {
+ throw new UnsupportedOperationException();
+ }
+
public static class ParsedInsert extends ModificationStatement.Parsed
{
private final List<ColumnIdentifier.Raw> columnNames;
@@ -121,52 +139,63 @@ public class UpdateStatement extends ModificationStatement
this.columnValues = columnValues;
}
- protected ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
+ @Override
+ protected ModificationStatement prepareInternal(CFMetaData cfm,
+ VariableSpecifications boundNames,
+ Conditions conditions,
+ Attributes attrs)
{
- UpdateStatement stmt = new UpdateStatement(ModificationStatement.StatementType.INSERT, boundNames.size(), cfm, attrs);
// Created from an INSERT
- if (stmt.isCounter())
- throw new InvalidRequestException("INSERT statements are not allowed on counter tables, use UPDATE instead");
+ checkFalse(cfm.isCounter(), "INSERT statements are not allowed on counter tables, use UPDATE instead");
- if (columnNames == null)
- throw new InvalidRequestException("Column names for INSERT must be provided when using VALUES");
- if (columnNames.isEmpty())
- throw new InvalidRequestException("No columns provided to INSERT");
- if (columnNames.size() != columnValues.size())
- throw new InvalidRequestException("Unmatched column names/values");
+ checkFalse(columnNames == null, "Column names for INSERT must be provided when using VALUES");
+ checkFalse(columnNames.isEmpty(), "No columns provided to INSERT");
+ checkFalse(columnNames.size() != columnValues.size(), "Unmatched column names/values");
+ checkContainsNoDuplicates(columnNames, "The column names contains duplicates");
+
+ List<Relation> relations = new ArrayList<>();
+ Operations operations = new Operations();
+ boolean hasClusteringColumnsSet = false;
- String ks = keyspace();
for (int i = 0; i < columnNames.size(); i++)
{
- ColumnIdentifier id = columnNames.get(i).prepare(cfm);
- ColumnDefinition def = cfm.getColumnDefinition(id);
- if (def == null)
- throw new InvalidRequestException(String.format("Unknown identifier %s", id));
+ ColumnDefinition def = getColumnDefinition(cfm, columnNames.get(i));
- for (int j = 0; j < i; j++)
- {
- ColumnIdentifier otherId = columnNames.get(j).prepare(cfm);
- if (id.equals(otherId))
- throw new InvalidRequestException(String.format("Multiple definitions found for column %s", id));
- }
+ if (def.isClusteringColumn())
+ hasClusteringColumnsSet = true;
Term.Raw value = columnValues.get(i);
+
if (def.isPrimaryKeyColumn())
{
- Term t = value.prepare(ks, def);
- t.collectMarkerSpecification(boundNames);
- stmt.addKeyValue(def, t);
+ relations.add(new SingleColumnRelation(columnNames.get(i), Operator.EQ, value));
}
else
{
- Operation operation = new Operation.SetValue(value).prepare(ks, def);
+ Operation operation = new Operation.SetValue(value).prepare(keyspace(), def);
operation.collectMarkerSpecification(boundNames);
- stmt.addOperation(operation);
+ operations.add(operation);
}
}
- return stmt;
+ boolean applyOnlyToStaticColumns = appliesOnlyToStaticColumns(operations, conditions) && !hasClusteringColumnsSet;
+
+ StatementRestrictions restrictions = new StatementRestrictions(StatementType.INSERT,
+ cfm,
+ relations,
+ boundNames,
+ applyOnlyToStaticColumns,
+ false,
+ false);
+
+ return new UpdateStatement(StatementType.INSERT,
+ boundNames.size(),
+ cfm,
+ operations,
+ restrictions,
+ conditions,
+ attrs);
}
}
@@ -183,24 +212,58 @@ public class UpdateStatement extends ModificationStatement
this.jsonValue = jsonValue;
}
- protected ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
+ @Override
+ protected ModificationStatement prepareInternal(CFMetaData cfm,
+ VariableSpecifications boundNames,
+ Conditions conditions,
+ Attributes attrs)
{
- UpdateStatement stmt = new UpdateStatement(ModificationStatement.StatementType.INSERT, boundNames.size(), cfm, attrs);
- if (stmt.isCounter())
- throw new InvalidRequestException("INSERT statements are not allowed on counter tables, use UPDATE instead");
+ checkFalse(cfm.isCounter(), "INSERT statements are not allowed on counter tables, use UPDATE instead");
Collection<ColumnDefinition> defs = cfm.allColumns();
Json.Prepared prepared = jsonValue.prepareAndCollectMarkers(cfm, defs, boundNames);
+ List<Relation> relations = new ArrayList<>();
+ Operations operations = new Operations();
+ boolean hasClusteringColumnsSet = false;
+
for (ColumnDefinition def : defs)
{
+ if (def.isClusteringColumn())
+ hasClusteringColumnsSet = true;
+
+ Term.Raw raw = prepared.getRawTermForColumn(def);
if (def.isPrimaryKeyColumn())
- stmt.addKeyValue(def, prepared.getPrimaryKeyValueForColumn(def));
+ {
+ relations.add(new SingleColumnRelation(new ColumnIdentifier.ColumnIdentifierValue(def.name),
+ Operator.EQ,
+ raw));
+ }
else
- stmt.addOperation(prepared.getSetOperationForColumn(def));
+ {
+ Operation operation = new Operation.SetValue(raw).prepare(keyspace(), def);
+ operation.collectMarkerSpecification(boundNames);
+ operations.add(operation);
+ }
}
- return stmt;
+ boolean applyOnlyToStaticColumns = appliesOnlyToStaticColumns(operations, conditions) && !hasClusteringColumnsSet;
+
+ StatementRestrictions restrictions = new StatementRestrictions(StatementType.INSERT,
+ cfm,
+ relations,
+ boundNames,
+ applyOnlyToStaticColumns,
+ false,
+ false);
+
+ return new UpdateStatement(StatementType.INSERT,
+ boundNames.size(),
+ cfm,
+ operations,
+ restrictions,
+ conditions,
+ attrs);
}
}
@@ -232,32 +295,39 @@ public class UpdateStatement extends ModificationStatement
this.whereClause = whereClause;
}
- protected ModificationStatement prepareInternal(CFMetaData cfm, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
+ @Override
+ protected ModificationStatement prepareInternal(CFMetaData cfm,
+ VariableSpecifications boundNames,
+ Conditions conditions,
+ Attributes attrs)
{
- UpdateStatement stmt = new UpdateStatement(ModificationStatement.StatementType.UPDATE, boundNames.size(), cfm, attrs);
+ Operations operations = new Operations();
for (Pair<ColumnIdentifier.Raw, Operation.RawUpdate> entry : updates)
{
- ColumnDefinition def = cfm.getColumnDefinition(entry.left.prepare(cfm));
- if (def == null)
- throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
+ ColumnDefinition def = getColumnDefinition(cfm, entry.left);
+
+ checkFalse(def.isPrimaryKeyColumn(), "PRIMARY KEY part %s found in SET part", def.name);
Operation operation = entry.right.prepare(keyspace(), def);
operation.collectMarkerSpecification(boundNames);
-
- switch (def.kind)
- {
- case PARTITION_KEY:
- case CLUSTERING:
- throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
- default:
- stmt.addOperation(operation);
- break;
- }
+ operations.add(operation);
}
- stmt.processWhereClause(whereClause, boundNames);
- return stmt;
+ StatementRestrictions restrictions = newRestrictions(StatementType.UPDATE,
+ cfm,
+ boundNames,
+ operations,
+ whereClause,
+ conditions);
+
+ return new UpdateStatement(StatementType.UPDATE,
+ boundNames.size(),
+ cfm,
+ operations,
+ restrictions,
+ conditions,
+ attrs);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e3727e3/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
new file mode 100644
index 0000000..f291000
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.CounterMutation;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * Utility class to collect updates.
+ *
+ * <p>In a batch statement we don't want to recreate mutations every time as this is particularly inefficient when
+ * applying multiple batch to the same partition (see #6737). </p>
+ *
+ */
+final class UpdatesCollector
+{
+ /**
+ * The columns that will be updated.
+ */
+ private final PartitionColumns updatedColumns;
+
+ /**
+ * The estimated number of updated row.
+ */
+ private final int updatedRows;
+
+ /**
+ * The mutations per keyspace.
+ */
+ private final Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
+
+ public UpdatesCollector(PartitionColumns updatedColumns, int updatedRows)
+ {
+ super();
+ this.updatedColumns = updatedColumns;
+ this.updatedRows = updatedRows;
+ }
+
+ /**
+ * Gets the <code>PartitionUpdate</code> for the specified column family and key. If the update does not
+ * exist it will be created.
+ *
+ * @param cfm the column family meta data
+ * @param dk the partition key
+ * @param consistency the consistency level
+ * @return the <code>PartitionUpdate</code> for the specified column family and key
+ */
+ public PartitionUpdate getPartitionUpdate(CFMetaData cfm, DecoratedKey dk, ConsistencyLevel consistency)
+ {
+ Mutation mut = getMutation(cfm, dk, consistency);
+ PartitionUpdate upd = mut.get(cfm);
+ if (upd == null)
+ {
+ upd = new PartitionUpdate(cfm, dk, updatedColumns, updatedRows);
+ mut.add(upd);
+ }
+ return upd;
+ }
+
+ private Mutation getMutation(CFMetaData cfm, DecoratedKey dk, ConsistencyLevel consistency)
+ {
+ String ksName = cfm.ksName;
+ IMutation mutation = keyspaceMap(ksName).get(dk.getKey());
+ if (mutation == null)
+ {
+ Mutation mut = new Mutation(ksName, dk);
+ mutation = cfm.isCounter() ? new CounterMutation(mut, consistency) : mut;
+ keyspaceMap(ksName).put(dk.getKey(), mutation);
+ return mut;
+ }
+ return cfm.isCounter() ? ((CounterMutation) mutation).getMutation() : (Mutation) mutation;
+ }
+
+ /**
+ * Returns a collection containing all the mutations.
+ * @return a collection containing all the mutations.
+ */
+ public Collection<IMutation> toMutations()
+ {
+ // The case where all statement where on the same keyspace is pretty common
+ if (mutations.size() == 1)
+ return mutations.values().iterator().next().values();
+
+ List<IMutation> ms = new ArrayList<>();
+ for (Map<ByteBuffer, IMutation> ksMap : mutations.values())
+ ms.addAll(ksMap.values());
+
+ return ms;
+ }
+
+ /**
+ * Returns the key-mutation mappings for the specified keyspace.
+ *
+ * @param ksName the keyspace name
+ * @return the key-mutation mappings for the specified keyspace.
+ */
+ private Map<ByteBuffer, IMutation> keyspaceMap(String ksName)
+ {
+ Map<ByteBuffer, IMutation> ksMap = mutations.get(ksName);
+ if (ksMap == null)
+ {
+ ksMap = new HashMap<>();
+ mutations.put(ksName, ksMap);
+ }
+ return ksMap;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e3727e3/src/java/org/apache/cassandra/db/CBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CBuilder.java b/src/java/org/apache/cassandra/db/CBuilder.java
index fe130dc..94feb93 100644
--- a/src/java/org/apache/cassandra/db/CBuilder.java
+++ b/src/java/org/apache/cassandra/db/CBuilder.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -193,17 +192,17 @@ public abstract class CBuilder
public Clustering buildWith(ByteBuffer value)
{
- assert size+1 == type.size();
+ assert size+1 <= type.size();
- ByteBuffer[] newValues = Arrays.copyOf(values, size+1);
+ ByteBuffer[] newValues = Arrays.copyOf(values, type.size());
newValues[size] = value;
return new Clustering(newValues);
}
public Clustering buildWith(List<ByteBuffer> newValues)
{
- assert size + newValues.size() == type.size();
- ByteBuffer[] buffers = Arrays.copyOf(values, size + newValues.size());
+ assert size + newValues.size() <= type.size();
+ ByteBuffer[] buffers = Arrays.copyOf(values, type.size());
int newSize = size;
for (ByteBuffer value : newValues)
buffers[newSize++] = value;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e3727e3/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 8865b0f..54199ab 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -19,7 +19,8 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.List;
+import java.util.Objects;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
@@ -27,6 +28,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.memory.AbstractAllocator;
+
/**
* A range tombstone is a tombstone that covers a slice/range of rows.
* <p>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e3727e3/src/java/org/apache/cassandra/db/Slices.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java
index bde9d96..db34c86 100644
--- a/src/java/org/apache/cassandra/db/Slices.java
+++ b/src/java/org/apache/cassandra/db/Slices.java
@@ -154,6 +154,15 @@ public abstract class Slices implements Iterable<Slice>
public abstract String toCQLString(CFMetaData metadata);
/**
+ * Checks if this <code>Slices</code> is empty.
+ * @return <code>true</code> if this <code>Slices</code> is empty, <code>false</code> otherwise.
+ */
+ public final boolean isEmpty()
+ {
+ return size() == 0;
+ }
+
+ /**
* In simple object that allows to test the inclusion of rows in those slices assuming those rows
* are passed (to {@link #includes}) in clustering order (or reverse clustering ordered, depending
* of the argument passed to {@link #inOrderTester}).
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e3727e3/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 7ae5651..4a2af66 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -206,7 +206,7 @@ public class CQLSSTableWriter implements Closeable
QueryOptions options = QueryOptions.forInternalCalls(null, values);
List<ByteBuffer> keys = insert.buildPartitionKeyNames(options);
- CBuilder clustering = insert.createClustering(options);
+ SortedSet<Clustering> clusterings = insert.createClustering(options);
long now = System.currentTimeMillis() * 1000;
// Note that we asks indexes to not validate values (the last 'false' arg below) because that triggers a 'Keyspace.open'
@@ -222,7 +222,10 @@ public class CQLSSTableWriter implements Closeable
try
{
for (ByteBuffer key : keys)
- insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params);
+ {
+ for (Clustering clustering : clusterings)
+ insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params);
+ }
return this;
}
catch (SSTableSimpleUnsortedWriter.SyncException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e3727e3/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
index 6442a11..75379fb 100644
--- a/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/MaterializedViewTest.java
@@ -448,6 +448,39 @@ public class MaterializedViewTest extends CQLTester
}
@Test
+ public void testRangeTombstone3() throws Throwable
+ {
+ createTable("CREATE TABLE %s (" +
+ "k int, " +
+ "asciival ascii, " +
+ "bigintval bigint, " +
+ "textval1 text, " +
+ "PRIMARY KEY((k, asciival), bigintval)" +
+ ")");
+
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+
+ createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE textval1 IS NOT NULL AND k IS NOT NULL AND asciival IS NOT NULL AND bigintval IS NOT NULL PRIMARY KEY ((textval1, k), asciival, bigintval)");
+
+ for (int i = 0; i < 100; i++)
+ updateMV("INSERT into %s (k,asciival,bigintval,textval1)VALUES(?,?,?,?)", 0, "foo", (long) i % 2, "bar" + i);
+
+ Assert.assertEquals(1, execute("select * from %s where k = 0 and asciival = 'foo' and bigintval = 0").size());
+ Assert.assertEquals(1, execute("select * from %s where k = 0 and asciival = 'foo' and bigintval = 1").size());
+
+
+ Assert.assertEquals(2, execute("select * from %s").size());
+ Assert.assertEquals(2, execute("select * from mv").size());
+
+ //Write a RT and verify the data is removed from index
+ updateMV("DELETE FROM %s WHERE k = ? AND asciival = ? and bigintval >= ?", 0, "foo", 0L);
+
+ Assert.assertEquals(0, execute("select * from %s").size());
+ Assert.assertEquals(0, execute("select * from mv").size());
+ }
+
+ @Test
public void testCompoundPartitionKey() throws Throwable
{
createTable("CREATE TABLE %s (" +
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e3727e3/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java
index 25fe227..6993bec 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFAuthTest.java
@@ -25,8 +25,6 @@ import com.google.common.collect.ImmutableSet;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.*;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -49,8 +47,6 @@ import static org.junit.Assert.fail;
public class UFAuthTest extends CQLTester
{
- private static final Logger logger = LoggerFactory.getLogger(UFAuthTest.class);
-
String roleName = "test_role";
AuthenticatedUser user;
RoleResource role;
@@ -319,14 +315,14 @@ public class UFAuthTest extends CQLTester
public void systemFunctionsRequireNoExplicitPrivileges() throws Throwable
{
// with terminal arguments, so evaluated at prepare time
- String cql = String.format("UPDATE %s SET v2 = 0 WHERE k = blobasint(intasblob(0))",
+ String cql = String.format("UPDATE %s SET v2 = 0 WHERE k = blobasint(intasblob(0)) and v1 = 0",
KEYSPACE + "." + currentTable());
getStatement(cql).checkAccess(clientState);
// with non-terminal arguments, so evaluated at execution
String functionName = createSimpleFunction();
grantExecuteOnFunction(functionName);
- cql = String.format("UPDATE %s SET v2 = 0 WHERE k = blobasint(intasblob(%s))",
+ cql = String.format("UPDATE %s SET v2 = 0 WHERE k = blobasint(intasblob(%s)) and v1 = 0",
KEYSPACE + "." + currentTable(),
functionCall(functionName));
getStatement(cql).checkAccess(clientState);