You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:10 UTC
[46/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index ff685cf..b4d7853 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.cql3.statements;
-import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterators;
@@ -27,7 +26,8 @@ import org.apache.cassandra.cql3.restrictions.Restriction;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.utils.Pair;
@@ -46,44 +46,57 @@ public class DeleteStatement extends ModificationStatement
return false;
}
- public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, Composite prefix, UpdateParameters params)
+ public void addUpdateForKey(PartitionUpdate update, CBuilder cbuilder, UpdateParameters params)
throws InvalidRequestException
{
- List<Operation> deletions = getOperations();
+ List<Operation> regularDeletions = getRegularOperations();
+ List<Operation> staticDeletions = getStaticOperations();
- if (prefix.size() < cfm.clusteringColumns().size() && !deletions.isEmpty())
+ if (regularDeletions.isEmpty() && staticDeletions.isEmpty())
{
- // In general, we can't delete specific columns if not all clustering columns have been specified.
- // However, if we delete only static colums, it's fine since we won't really use the prefix anyway.
- for (Operation deletion : deletions)
- if (!deletion.column.isStatic())
- throw new InvalidRequestException(String.format("Primary key column '%s' must be specified in order to delete column '%s'", getFirstEmptyKey().name, deletion.column.name));
- }
-
- if (deletions.isEmpty())
- {
- // We delete the slice selected by the prefix.
- // However, for performance reasons, we distinguish 2 cases:
- // - It's a full internal row delete
- // - It's a full cell name (i.e it's a dense layout and the prefix is full)
- if (prefix.isEmpty())
+ // We're not deleting any specific columns so it's either a full partition deletion ....
+ if (cbuilder.count() == 0)
{
- // No columns specified, delete the row
- cf.delete(new DeletionInfo(params.timestamp, params.localDeletionTime));
+ update.addPartitionDeletion(params.deletionTime());
}
- else if (cfm.comparator.isDense() && prefix.size() == cfm.clusteringColumns().size())
+ // ... or a row deletion ...
+ else if (cbuilder.remainingCount() == 0)
{
- cf.addAtom(params.makeTombstone(cfm.comparator.create(prefix, null)));
+ Clustering clustering = cbuilder.build();
+ Row.Writer writer = update.writer();
+ params.writeClustering(clustering, writer);
+ params.writeRowDeletion(writer);
+ writer.endOfRow();
}
+ // ... or a range of rows deletion.
else
{
- cf.addAtom(params.makeRangeTombstone(prefix.slice()));
+ update.addRangeTombstone(params.makeRangeTombstone(cbuilder));
}
}
else
{
- for (Operation op : deletions)
- op.execute(key, cf, prefix, params);
+ if (!regularDeletions.isEmpty())
+ {
+ // We can't delete specific (regular) columns if not all clustering columns have been specified.
+ if (cbuilder.remainingCount() > 0)
+ throw new InvalidRequestException(String.format("Primary key column '%s' must be specified in order to delete column '%s'", getFirstEmptyKey().name, regularDeletions.get(0).column.name));
+
+ Clustering clustering = cbuilder.build();
+ Row.Writer writer = update.writer();
+ params.writeClustering(clustering, writer);
+ for (Operation op : regularDeletions)
+ op.execute(update.partitionKey(), clustering, writer, params);
+ writer.endOfRow();
+ }
+
+ if (!staticDeletions.isEmpty())
+ {
+ Row.Writer writer = update.staticWriter();
+ for (Operation op : staticDeletions)
+ op.execute(update.partitionKey(), Clustering.STATIC_CLUSTERING, writer, params);
+ writer.endOfRow();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
index ed10d00..8ad4f6c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTypeStatement.java
@@ -119,12 +119,6 @@ public class DropTypeStatement extends SchemaAlteringStatement
if (isUsedBy(subtype))
return true;
}
- else if (toCheck instanceof ColumnToCollectionType)
- {
- for (CollectionType collection : ((ColumnToCollectionType)toCheck).defined.values())
- if (isUsedBy(collection))
- return true;
- }
else if (toCheck instanceof CollectionType)
{
if (toCheck instanceof ListType)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 888cdb7..6a6d186 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -21,7 +21,8 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
@@ -33,19 +34,20 @@ import org.apache.cassandra.cql3.restrictions.Restriction;
import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction;
import org.apache.cassandra.cql3.selection.Selection;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CompositesBuilder;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.triggers.TriggerExecutor;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
@@ -58,6 +60,8 @@ import static org.apache.cassandra.cql3.statements.RequestValidations.invalidReq
*/
public abstract class ModificationStatement implements CQLStatement
{
+ protected static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
+
private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]", false);
public static enum StatementType { INSERT, UPDATE, DELETE }
@@ -68,7 +72,15 @@ public abstract class ModificationStatement implements CQLStatement
public final Attributes attrs;
protected final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<>();
- private final List<Operation> columnOperations = new ArrayList<Operation>();
+ private final List<Operation> regularOperations = new ArrayList<>();
+ private final List<Operation> staticOperations = new ArrayList<>();
+
+ // TODO: If we had a builder for this statement, we could have updatedColumns/conditionColumns final and only have
+ // updatedColumnsBuilder/conditionColumnsBuilder in the builder ...
+ private PartitionColumns updatedColumns;
+ private PartitionColumns.Builder updatedColumnsBuilder = PartitionColumns.builder();
+ private PartitionColumns conditionColumns;
+ private PartitionColumns.Builder conditionColumnsBuilder = PartitionColumns.builder();
// Separating normal and static conditions makes things somewhat easier
private List<ColumnCondition> columnConditions;
@@ -103,25 +115,24 @@ public abstract class ModificationStatement implements CQLStatement
Iterable<Function> functions = attrs.getFunctions();
for (Restriction restriction : processedKeys.values())
- functions = Iterables.concat(functions, restriction.getFunctions());
-
- if (columnOperations != null)
- for (Operation operation : columnOperations)
- functions = Iterables.concat(functions, operation.getFunctions());
+ functions = Iterables.concat(functions, restriction.getFunctions());
- if (columnConditions != null)
- for (ColumnCondition condition : columnConditions)
- functions = Iterables.concat(functions, condition.getFunctions());
+ for (Operation operation : allOperations())
+ functions = Iterables.concat(functions, operation.getFunctions());
- if (staticConditions != null)
- for (ColumnCondition condition : staticConditions)
- functions = Iterables.concat(functions, condition.getFunctions());
+ for (ColumnCondition condition : allConditions())
+ functions = Iterables.concat(functions, condition.getFunctions());
return functions;
}
+ public boolean hasNoClusteringColumns()
+ {
+ return hasNoClusteringColumns;
+ }
+
public abstract boolean requireFullClusteringKey();
- public abstract void addUpdateForKey(ColumnFamily updates, ByteBuffer key, Composite prefix, UpdateParameters params) throws InvalidRequestException;
+ public abstract void addUpdateForKey(PartitionUpdate update, CBuilder clusteringBuilder, UpdateParameters params) throws InvalidRequestException;
public int getBoundTerms()
{
@@ -184,16 +195,75 @@ public abstract class ModificationStatement implements CQLStatement
public void addOperation(Operation op)
{
+ updatedColumnsBuilder.add(op.column);
+ // If the operation requires a read-before-write and we're doing a conditional read, we want to read
+ // the affected column as part of the read-for-conditions paxos phase (see #7499).
+ if (op.requiresRead())
+ conditionColumnsBuilder.add(op.column);
+
if (op.column.isStatic())
+ {
setsStaticColumns = true;
+ staticOperations.add(op);
+ }
else
+ {
setsRegularColumns = true;
- columnOperations.add(op);
+ regularOperations.add(op);
+ }
+ }
+
+ public PartitionColumns updatedColumns()
+ {
+ return updatedColumns;
+ }
+
+ public PartitionColumns conditionColumns()
+ {
+ return conditionColumns;
}
- public List<Operation> getOperations()
+ public boolean updatesRegularRows()
{
- return columnOperations;
+ // We're updating regular rows if all the clustering columns are provided.
+ // Note that the only case where we're allowed not to provide clustering
+ // columns is if we set some static columns, and in that case no clustering
+ // columns should be given. So in practice, it's enough to check if we have
+ // either the table has no clustering or if it has at least one of them set.
+ return cfm.clusteringColumns().isEmpty() || !hasNoClusteringColumns;
+ }
+
+ public boolean updatesStaticRow()
+ {
+ return !staticOperations.isEmpty();
+ }
+
+ private void finishPreparation()
+ {
+ updatedColumns = updatedColumnsBuilder.build();
+ // Compact tables have not row marker. So if we don't actually update any particular column,
+ // this means that we're only updating the PK, which we allow if only those were declared in
+ // the definition. In that case however, we do went to write the compactValueColumn (since again
+ // we can't use a "row marker") so add it automatically.
+ if (cfm.isCompactTable() && updatedColumns.isEmpty() && updatesRegularRows())
+ updatedColumns = cfm.partitionColumns();
+
+ conditionColumns = conditionColumnsBuilder.build();
+ }
+
+ public List<Operation> getRegularOperations()
+ {
+ return regularOperations;
+ }
+
+ public List<Operation> getStaticOperations()
+ {
+ return staticOperations;
+ }
+
+ public Iterable<Operation> allOperations()
+ {
+ return Iterables.concat(staticOperations, regularOperations);
}
public Iterable<ColumnDefinition> getColumnsWithConditions()
@@ -205,8 +275,19 @@ public abstract class ModificationStatement implements CQLStatement
staticConditions == null ? Collections.<ColumnDefinition>emptyList() : Iterables.transform(staticConditions, getColumnForCondition));
}
+ public Iterable<ColumnCondition> allConditions()
+ {
+ if (staticConditions == null)
+ return columnConditions == null ? Collections.<ColumnCondition>emptySet(): columnConditions;
+ if (columnConditions == null)
+ return staticConditions;
+ return Iterables.concat(staticConditions, columnConditions);
+ }
+
public void addCondition(ColumnCondition cond)
{
+ conditionColumnsBuilder.add(cond.column);
+
List<ColumnCondition> conds = null;
if (cond.column.isStatic())
{
@@ -255,7 +336,7 @@ public abstract class ModificationStatement implements CQLStatement
public void addKeyValue(ColumnDefinition def, Term value) throws InvalidRequestException
{
- addKeyValues(def, new SingleColumnRestriction.EQ(def, value));
+ addKeyValues(def, new SingleColumnRestriction.EQRestriction(def, value));
}
public void processWhereClause(List<Relation> whereClause, VariableSpecifications names) throws InvalidRequestException
@@ -303,26 +384,25 @@ public abstract class ModificationStatement implements CQLStatement
public List<ByteBuffer> buildPartitionKeyNames(QueryOptions options)
throws InvalidRequestException
{
- CompositesBuilder keyBuilder = new CompositesBuilder(cfm.getKeyValidatorAsCType());
+ MultiCBuilder keyBuilder = MultiCBuilder.create(cfm.getKeyValidatorAsClusteringComparator());
for (ColumnDefinition def : cfm.partitionKeyColumns())
{
Restriction r = checkNotNull(processedKeys.get(def.name), "Missing mandatory PRIMARY KEY part %s", def.name);
r.appendTo(keyBuilder, options);
}
- return Lists.transform(keyBuilder.build(), new com.google.common.base.Function<Composite, ByteBuffer>()
+ NavigableSet<Clustering> clusterings = keyBuilder.build();
+ List<ByteBuffer> keys = new ArrayList<ByteBuffer>(clusterings.size());
+ for (Clustering clustering : clusterings)
{
- @Override
- public ByteBuffer apply(Composite composite)
- {
- ByteBuffer byteBuffer = composite.toByteBuffer();
- ThriftValidation.validateKey(cfm, byteBuffer);
- return byteBuffer;
- }
- });
+ ByteBuffer key = CFMetaData.serializePartitionKey(clustering);
+ ThriftValidation.validateKey(cfm, key);
+ keys.add(key);
+ }
+ return keys;
}
- public Composite createClusteringPrefix(QueryOptions options)
+ public CBuilder createClustering(QueryOptions options)
throws InvalidRequestException
{
// If the only updated/deleted columns are static, then we don't need clustering columns.
@@ -339,7 +419,7 @@ public abstract class ModificationStatement implements CQLStatement
{
// If we set no non-static columns, then it's fine not to have clustering columns
if (hasNoClusteringColumns)
- return cfm.comparator.staticPrefix();
+ return CBuilder.STATIC_BUILDER;
// If we do have clustering columns however, then either it's an INSERT and the query is valid
// but we still need to build a proper prefix, or it's not an INSERT, and then we want to reject
@@ -354,13 +434,15 @@ public abstract class ModificationStatement implements CQLStatement
}
}
- return createClusteringPrefixBuilderInternal(options);
+ return createClusteringInternal(options);
}
- private Composite createClusteringPrefixBuilderInternal(QueryOptions options)
+ private CBuilder createClusteringInternal(QueryOptions options)
throws InvalidRequestException
{
- CompositesBuilder builder = new CompositesBuilder(cfm.comparator);
+ CBuilder builder = CBuilder.create(cfm.comparator);
+ MultiCBuilder multiBuilder = MultiCBuilder.wrap(builder);
+
ColumnDefinition firstEmptyKey = null;
for (ColumnDefinition def : cfm.clusteringColumns())
{
@@ -368,7 +450,7 @@ public abstract class ModificationStatement implements CQLStatement
if (r == null)
{
firstEmptyKey = def;
- checkFalse(requireFullClusteringKey() && !cfm.comparator.isDense() && cfm.comparator.isCompound(),
+ checkFalse(requireFullClusteringKey() && !cfm.isDense() && cfm.isCompound(),
"Missing mandatory PRIMARY KEY part %s", def.name);
}
else if (firstEmptyKey != null)
@@ -377,10 +459,10 @@ public abstract class ModificationStatement implements CQLStatement
}
else
{
- r.appendTo(builder, options);
+ r.appendTo(multiBuilder, options);
}
}
- return builder.build().get(0); // We only allow IN for row keys so far
+ return builder;
}
protected ColumnDefinition getFirstEmptyKey()
@@ -396,14 +478,14 @@ public abstract class ModificationStatement implements CQLStatement
public boolean requiresRead()
{
// Lists SET operation incurs a read.
- for (Operation op : columnOperations)
+ for (Operation op : allOperations())
if (op.requiresRead())
return true;
return false;
}
- protected Map<ByteBuffer, CQL3Row> readRequiredRows(Collection<ByteBuffer> partitionKeys, Composite clusteringPrefix, boolean local, ConsistencyLevel cl)
+ protected Map<DecoratedKey, Partition> readRequiredLists(Collection<ByteBuffer> partitionKeys, CBuilder cbuilder, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
if (!requiresRead())
@@ -418,32 +500,54 @@ public abstract class ModificationStatement implements CQLStatement
throw new InvalidRequestException(String.format("Write operation require a read but consistency %s is not supported on reads", cl));
}
- ColumnSlice[] slices = new ColumnSlice[]{ clusteringPrefix.slice() };
- List<ReadCommand> commands = new ArrayList<ReadCommand>(partitionKeys.size());
- long now = System.currentTimeMillis();
+ // TODO: no point in recomputing that every time. Should move to preparation phase.
+ PartitionColumns.Builder builder = PartitionColumns.builder();
+ for (Operation op : allOperations())
+ if (op.requiresRead())
+ builder.add(op.column);
+
+ PartitionColumns toRead = builder.build();
+
+ NavigableSet<Clustering> clusterings = FBUtilities.singleton(cbuilder.build(), cfm.comparator);
+ List<SinglePartitionReadCommand<?>> commands = new ArrayList<>(partitionKeys.size());
+ int nowInSec = FBUtilities.nowInSeconds();
for (ByteBuffer key : partitionKeys)
- commands.add(new SliceFromReadCommand(keyspace(),
- key,
- columnFamily(),
- now,
- new SliceQueryFilter(slices, false, Integer.MAX_VALUE)));
-
- List<Row> rows = local
- ? SelectStatement.readLocally(keyspace(), commands)
- : StorageProxy.read(commands, cl);
-
- Map<ByteBuffer, CQL3Row> map = new HashMap<ByteBuffer, CQL3Row>();
- for (Row row : rows)
+ commands.add(new SinglePartitionNamesCommand(cfm,
+ nowInSec,
+ ColumnFilter.selection(toRead),
+ RowFilter.NONE,
+ DataLimits.NONE,
+ StorageService.getPartitioner().decorateKey(key),
+ new ClusteringIndexNamesFilter(clusterings, false)));
+
+ Map<DecoratedKey, Partition> map = new HashMap();
+
+ SinglePartitionReadCommand.Group group = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
+
+ if (local)
+ {
+ try (ReadOrderGroup orderGroup = group.startOrderGroup(); PartitionIterator iter = group.executeInternal(orderGroup))
+ {
+ return asMaterializedMap(iter);
+ }
+ }
+ else
{
- if (row.cf == null || row.cf.isEmpty())
- continue;
+ try (PartitionIterator iter = group.execute(cl, null))
+ {
+ return asMaterializedMap(iter);
+ }
+ }
+ }
- Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(row.cf.getSortedColumns().iterator());
- if (iter.hasNext())
+ private Map<DecoratedKey, Partition> asMaterializedMap(PartitionIterator iterator)
+ {
+ Map<DecoratedKey, Partition> map = new HashMap();
+ while (iterator.hasNext())
+ {
+ try (RowIterator partition = iterator.next())
{
- map.put(row.key.getKey(), iter.next());
- // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key)
- assert !iter.hasNext();
+ map.put(partition.partitionKey(), FilteredPartition.create(partition));
}
}
return map;
@@ -492,14 +596,16 @@ public abstract class ModificationStatement implements CQLStatement
{
CQL3CasRequest request = makeCasRequest(queryState, options);
- ColumnFamily result = StorageProxy.cas(keyspace(),
- columnFamily(),
- request.key,
- request,
- options.getSerialConsistency(),
- options.getConsistency(),
- queryState.getClientState());
- return new ResultMessage.Rows(buildCasResultSet(request.key, result, options));
+ try (RowIterator result = StorageProxy.cas(keyspace(),
+ columnFamily(),
+ request.key,
+ request,
+ options.getSerialConsistency(),
+ options.getConsistency(),
+ queryState.getClientState()))
+ {
+ return new ResultMessage.Rows(buildCasResultSet(result, options));
+ }
}
private CQL3CasRequest makeCasRequest(QueryState queryState, QueryOptions options)
@@ -509,54 +615,54 @@ public abstract class ModificationStatement implements CQLStatement
if (keys.size() > 1)
throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
- ByteBuffer key = keys.get(0);
+ DecoratedKey key = StorageService.getPartitioner().decorateKey(keys.get(0));
long now = options.getTimestamp(queryState);
- Composite prefix = createClusteringPrefix(options);
+ CBuilder cbuilder = createClustering(options);
- CQL3CasRequest request = new CQL3CasRequest(cfm, key, false);
- addConditions(prefix, request, options);
- request.addRowUpdate(prefix, this, options, now);
+ CQL3CasRequest request = new CQL3CasRequest(cfm, key, false, conditionColumns(), updatesRegularRows(), updatesStaticRow());
+ addConditions(cbuilder.build(), request, options);
+ request.addRowUpdate(cbuilder, this, options, now);
return request;
}
- public void addConditions(Composite clusteringPrefix, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException
+ public void addConditions(Clustering clustering, CQL3CasRequest request, QueryOptions options) throws InvalidRequestException
{
if (ifNotExists)
{
// If we use ifNotExists, if the statement applies to any non static columns, then the condition is on the row of the non-static
- // columns and the prefix should be the clusteringPrefix. But if only static columns are set, then the ifNotExists apply to the existence
+ // columns and the prefix should be the clustering. But if only static columns are set, then the ifNotExists apply to the existence
// of any static columns and we should use the prefix for the "static part" of the partition.
- request.addNotExist(clusteringPrefix);
+ request.addNotExist(clustering);
}
else if (ifExists)
{
- request.addExist(clusteringPrefix);
+ request.addExist(clustering);
}
else
{
if (columnConditions != null)
- request.addConditions(clusteringPrefix, columnConditions, options);
+ request.addConditions(clustering, columnConditions, options);
if (staticConditions != null)
- request.addConditions(cfm.comparator.staticPrefix(), staticConditions, options);
+ request.addConditions(Clustering.STATIC_CLUSTERING, staticConditions, options);
}
}
- private ResultSet buildCasResultSet(ByteBuffer key, ColumnFamily cf, QueryOptions options) throws InvalidRequestException
+ private ResultSet buildCasResultSet(RowIterator partition, QueryOptions options) throws InvalidRequestException
{
- return buildCasResultSet(keyspace(), key, columnFamily(), cf, getColumnsWithConditions(), false, options);
+ return buildCasResultSet(keyspace(), columnFamily(), partition, getColumnsWithConditions(), false, options);
}
- public static ResultSet buildCasResultSet(String ksName, ByteBuffer key, String cfName, ColumnFamily cf, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options)
+ public static ResultSet buildCasResultSet(String ksName, String tableName, RowIterator partition, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options)
throws InvalidRequestException
{
- boolean success = cf == null;
+ boolean success = partition == null;
- ColumnSpecification spec = new ColumnSpecification(ksName, cfName, CAS_RESULT_COLUMN, BooleanType.instance);
+ ColumnSpecification spec = new ColumnSpecification(ksName, tableName, CAS_RESULT_COLUMN, BooleanType.instance);
ResultSet.ResultMetadata metadata = new ResultSet.ResultMetadata(Collections.singletonList(spec));
List<List<ByteBuffer>> rows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success)));
ResultSet rs = new ResultSet(metadata, rows);
- return success ? rs : merge(rs, buildCasFailureResultSet(key, cf, columnsWithConditions, isBatch, options));
+ return success ? rs : merge(rs, buildCasFailureResultSet(partition, columnsWithConditions, isBatch, options));
}
private static ResultSet merge(ResultSet left, ResultSet right)
@@ -582,10 +688,10 @@ public abstract class ModificationStatement implements CQLStatement
return new ResultSet(new ResultSet.ResultMetadata(specs), rows);
}
- private static ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options)
+ private static ResultSet buildCasFailureResultSet(RowIterator partition, Iterable<ColumnDefinition> columnsWithConditions, boolean isBatch, QueryOptions options)
throws InvalidRequestException
{
- CFMetaData cfm = cf.metadata();
+ CFMetaData cfm = partition.metadata();
Selection selection;
if (columnsWithConditions == null)
{
@@ -609,9 +715,8 @@ public abstract class ModificationStatement implements CQLStatement
}
- long now = System.currentTimeMillis();
- Selection.ResultSetBuilder builder = selection.resultSetBuilder(now, false);
- SelectStatement.forSelection(cfm, selection).processColumnFamily(key, cf, options, now, builder);
+ Selection.ResultSetBuilder builder = selection.resultSetBuilder(false);
+ SelectStatement.forSelection(cfm, selection).processPartition(partition, options, builder, FBUtilities.nowInSeconds());
return builder.build(options.getProtocolVersion());
}
@@ -640,31 +745,31 @@ public abstract class ModificationStatement implements CQLStatement
public ResultMessage executeInternalWithCondition(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException
{
CQL3CasRequest request = makeCasRequest(state, options);
- ColumnFamily result = casInternal(request, state);
- return new ResultMessage.Rows(buildCasResultSet(request.key, result, options));
+ try (RowIterator result = casInternal(request, state))
+ {
+ return new ResultMessage.Rows(buildCasResultSet(result, options));
+ }
}
- static ColumnFamily casInternal(CQL3CasRequest request, QueryState state)
+ static RowIterator casInternal(CQL3CasRequest request, QueryState state)
{
UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp());
CFMetaData metadata = Schema.instance.getCFMetaData(request.cfm.ksName, request.cfm.cfName);
- ReadCommand readCommand = ReadCommand.create(request.cfm.ksName, request.key, request.cfm.cfName, request.now, request.readFilter());
- Keyspace keyspace = Keyspace.open(request.cfm.ksName);
-
- Row row = readCommand.getRow(keyspace);
- ColumnFamily current = row.cf;
- if (!request.appliesTo(current))
+ SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
+ FilteredPartition current;
+ try (ReadOrderGroup orderGroup = readCommand.startOrderGroup(); PartitionIterator iter = readCommand.executeInternal(orderGroup))
{
- if (current == null)
- current = ArrayBackedSortedColumns.factory.create(metadata);
- return current;
+ current = FilteredPartition.create(PartitionIterators.getOnlyElement(iter, readCommand));
}
- ColumnFamily updates = request.makeUpdates(current);
- updates = TriggerExecutor.instance.execute(request.key, updates);
+ if (!request.appliesTo(current))
+ return current.rowIterator();
+
+ PartitionUpdate updates = request.makeUpdates(current);
+ updates = TriggerExecutor.instance.execute(updates);
- Commit proposal = Commit.newProposal(request.key, ballot, updates);
+ Commit proposal = Commit.newProposal(ballot, updates);
proposal.makeMutation().apply();
return null;
}
@@ -683,17 +788,18 @@ public abstract class ModificationStatement implements CQLStatement
throws RequestExecutionException, RequestValidationException
{
List<ByteBuffer> keys = buildPartitionKeyNames(options);
- Composite clusteringPrefix = createClusteringPrefix(options);
+ CBuilder clustering = createClustering(options);
- UpdateParameters params = makeUpdateParameters(keys, clusteringPrefix, options, local, now);
+ UpdateParameters params = makeUpdateParameters(keys, clustering, options, local, now);
Collection<IMutation> mutations = new ArrayList<IMutation>(keys.size());
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(cfm, key);
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
- addUpdateForKey(cf, key, clusteringPrefix, params);
- Mutation mut = new Mutation(cfm.ksName, key, cf);
+ DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+ PartitionUpdate upd = new PartitionUpdate(cfm, dk, updatedColumns(), 1);
+ addUpdateForKey(upd, clustering, params);
+ Mutation mut = new Mutation(upd);
mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut);
}
@@ -701,15 +807,15 @@ public abstract class ModificationStatement implements CQLStatement
}
public UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
- Composite prefix,
+ CBuilder clustering,
QueryOptions options,
boolean local,
long now)
throws RequestExecutionException, RequestValidationException
{
// Some lists operation requires reading
- Map<ByteBuffer, CQL3Row> rows = readRequiredRows(keys, prefix, local, options.getConsistency());
- return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), rows);
+ Map<DecoratedKey, Partition> lists = readRequiredLists(keys, clustering, local, options.getConsistency());
+ return new UpdateParameters(cfm, options, getTimestamp(now, options), getTimeToLive(options), lists, true);
}
/**
@@ -803,6 +909,8 @@ public abstract class ModificationStatement implements CQLStatement
stmt.validateWhereClauseForConditions();
}
+
+ stmt.finishPreparation();
return stmt;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index e2708cd..6a7f429 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -23,7 +23,8 @@ import java.util.*;
import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
@@ -34,7 +35,8 @@ import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.cql3.selection.RawSelector;
import org.apache.cassandra.cql3.selection.Selection;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.marshal.CollectionType;
@@ -43,12 +45,9 @@ import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.pager.Pageable;
+import org.apache.cassandra.service.*;
import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.service.pager.QueryPagers;
+import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -71,6 +70,8 @@ import static org.apache.cassandra.utils.ByteBufferUtil.UNSET_BYTE_BUFFER;
*/
public class SelectStatement implements CQLStatement
{
+ private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class);
+
private static final int DEFAULT_COUNT_PAGE_SIZE = 10000;
private final int boundTerms;
@@ -88,6 +89,8 @@ public class SelectStatement implements CQLStatement
*/
private final Comparator<List<ByteBuffer>> orderingComparator;
+ private final ColumnFilter queriedColumns;
+
// Used by forSelection below
private static final Parameters defaultParameters = new Parameters(Collections.<ColumnIdentifier.Raw, Boolean>emptyMap(), false, false, false);
@@ -108,6 +111,7 @@ public class SelectStatement implements CQLStatement
this.orderingComparator = orderingComparator;
this.parameters = parameters;
this.limit = limit;
+ this.queriedColumns = gatherQueriedColumns();
}
public Iterable<Function> getFunctions()
@@ -117,6 +121,23 @@ public class SelectStatement implements CQLStatement
limit != null ? limit.getFunctions() : Collections.<Function>emptySet());
}
+ // Note that the queried columns internally is different from the one selected by the
+ // user as it also include any column for which we have a restriction on.
+ private ColumnFilter gatherQueriedColumns()
+ {
+ if (selection.isWildcard())
+ return ColumnFilter.all(cfm);
+
+ ColumnFilter.Builder builder = ColumnFilter.allColumnsBuilder(cfm);
+ // Adds all selected columns
+ for (ColumnDefinition def : selection.getColumns())
+ if (!def.isPrimaryKeyColumn())
+ builder.add(def);
+ // as well as any restricted column (so we can actually apply the restriction)
+ builder.addAll(restrictions.nonPKRestrictedColumns());
+ return builder.build();
+ }
+
// Creates a simple select based on the given selection.
// Note that the results select statement should not be used for actual queries, but only for processing already
// queried data through processColumnFamily.
@@ -161,31 +182,16 @@ public class SelectStatement implements CQLStatement
cl.validateForRead(keyspace());
- int limit = getLimit(options);
- long now = System.currentTimeMillis();
- Pageable command = getPageableCommand(options, limit, now);
- int pageSize = getPageSize(options);
+ int nowInSec = FBUtilities.nowInSeconds();
+ ReadQuery query = getQuery(options, nowInSec);
- if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
- return execute(command, options, limit, now, state);
-
- QueryPager pager = QueryPagers.pager(command, cl, state.getClientState(), options.getPagingState());
- return execute(pager, options, limit, now, pageSize);
- }
+ int pageSize = getPageSize(options);
- private Pageable getPageableCommand(QueryOptions options, int limit, long now) throws RequestValidationException
- {
- int limitForQuery = updateLimitForQuery(limit);
- if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
- return getRangeCommand(options, limitForQuery, now);
+ if (pageSize <= 0 || query.limits().count() <= pageSize)
+ return execute(query, options, state, nowInSec);
- List<ReadCommand> commands = getSliceCommands(options, limitForQuery, now);
- return commands == null ? null : new Pageable.ReadCommands(commands, limitForQuery);
- }
-
- public Pageable getPageableCommand(QueryOptions options) throws RequestValidationException
- {
- return getPageableCommand(options, getLimit(options), System.currentTimeMillis());
+ QueryPager pager = query.getPager(options.getPagingState());
+ return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec);
}
private int getPageSize(QueryOptions options)
@@ -201,103 +207,162 @@ public class SelectStatement implements CQLStatement
return pageSize;
}
- private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now, QueryState state)
- throws RequestValidationException, RequestExecutionException
+ public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException
+ {
+ DataLimits limit = getLimit(options);
+ if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
+ return getRangeCommand(options, limit, nowInSec);
+
+ return getSliceCommands(options, limit, nowInSec);
+ }
+
+ private ResultMessage.Rows execute(ReadQuery query, QueryOptions options, QueryState state, int nowInSec) throws RequestValidationException, RequestExecutionException
{
- List<Row> rows;
- if (command == null)
+ try (PartitionIterator data = query.execute(options.getConsistency(), state.getClientState()))
{
- rows = Collections.<Row>emptyList();
+ return processResults(data, options, nowInSec);
}
- else
+ }
+
+ // Simple wrapper class to avoid some code duplication
+ private static abstract class Pager
+ {
+ protected QueryPager pager;
+
+ protected Pager(QueryPager pager)
+ {
+ this.pager = pager;
+ }
+
+ public static Pager forInternalQuery(QueryPager pager, ReadOrderGroup orderGroup)
+ {
+ return new InternalPager(pager, orderGroup);
+ }
+
+ public static Pager forDistributedQuery(QueryPager pager, ConsistencyLevel consistency, ClientState clientState)
+ {
+ return new NormalPager(pager, consistency, clientState);
+ }
+
+ public boolean isExhausted()
+ {
+ return pager.isExhausted();
+ }
+
+ public PagingState state()
+ {
+ return pager.state();
+ }
+
+ public abstract PartitionIterator fetchPage(int pageSize);
+
+ public static class NormalPager extends Pager
{
- rows = command instanceof Pageable.ReadCommands
- ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency(), state.getClientState())
- : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency());
+ private final ConsistencyLevel consistency;
+ private final ClientState clientState;
+
+ private NormalPager(QueryPager pager, ConsistencyLevel consistency, ClientState clientState)
+ {
+ super(pager);
+ this.consistency = consistency;
+ this.clientState = clientState;
+ }
+
+ public PartitionIterator fetchPage(int pageSize)
+ {
+ return pager.fetchPage(pageSize, consistency, clientState);
+ }
}
- return processResults(rows, options, limit, now);
+ public static class InternalPager extends Pager
+ {
+ private final ReadOrderGroup orderGroup;
+
+ private InternalPager(QueryPager pager, ReadOrderGroup orderGroup)
+ {
+ super(pager);
+ this.orderGroup = orderGroup;
+ }
+
+ public PartitionIterator fetchPage(int pageSize)
+ {
+ return pager.fetchPageInternal(pageSize, orderGroup);
+ }
+ }
}
- private ResultMessage.Rows execute(QueryPager pager, QueryOptions options, int limit, long now, int pageSize)
+ private ResultMessage.Rows execute(Pager pager, QueryOptions options, int pageSize, int nowInSec)
throws RequestValidationException, RequestExecutionException
{
if (selection.isAggregate())
- return pageAggregateQuery(pager, options, pageSize, now);
+ return pageAggregateQuery(pager, options, pageSize, nowInSec);
// We can't properly do post-query ordering if we page (see #6722)
checkFalse(needsPostQueryOrdering(),
- "Cannot page queries with both ORDER BY and a IN restriction on the partition key;"
- + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query");
+ "Cannot page queries with both ORDER BY and a IN restriction on the partition key;"
+ + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query");
- List<Row> page = pager.fetchPage(pageSize);
- ResultMessage.Rows msg = processResults(page, options, limit, now);
+ ResultMessage.Rows msg;
+ try (PartitionIterator page = pager.fetchPage(pageSize))
+ {
+ msg = processResults(page, options, nowInSec);
+ }
+ // Please note that the isExhausted state of the pager only gets updated when we've closed the page, so this
+ // shouldn't be moved inside the 'try' above.
if (!pager.isExhausted())
msg.result.metadata.setHasMorePages(pager.state());
return msg;
}
- private ResultMessage.Rows pageAggregateQuery(QueryPager pager, QueryOptions options, int pageSize, long now)
- throws RequestValidationException, RequestExecutionException
+ private ResultMessage.Rows pageAggregateQuery(Pager pager, QueryOptions options, int pageSize, int nowInSec)
+ throws RequestValidationException, RequestExecutionException
{
- Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson);
+ Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson);
while (!pager.isExhausted())
{
- for (org.apache.cassandra.db.Row row : pager.fetchPage(pageSize))
+ try (PartitionIterator iter = pager.fetchPage(pageSize))
{
- // Not columns match the query, skip
- if (row.cf == null)
- continue;
-
- processColumnFamily(row.key.getKey(), row.cf, options, now, result);
+ while (iter.hasNext())
+ processPartition(iter.next(), options, result, nowInSec);
}
}
return new ResultMessage.Rows(result.build(options.getProtocolVersion()));
}
- public ResultMessage.Rows processResults(List<Row> rows, QueryOptions options, int limit, long now) throws RequestValidationException
+ private ResultMessage.Rows processResults(PartitionIterator partitions, QueryOptions options, int nowInSec) throws RequestValidationException
{
- ResultSet rset = process(rows, options, limit, now);
+ ResultSet rset = process(partitions, options, nowInSec);
return new ResultMessage.Rows(rset);
}
- static List<Row> readLocally(String keyspaceName, List<ReadCommand> cmds)
- {
- Keyspace keyspace = Keyspace.open(keyspaceName);
- List<Row> rows = new ArrayList<Row>(cmds.size());
- for (ReadCommand cmd : cmds)
- rows.add(cmd.getRow(keyspace));
- return rows;
- }
-
public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
- int limit = getLimit(options);
- long now = System.currentTimeMillis();
- Pageable command = getPageableCommand(options, limit, now);
+ int nowInSec = FBUtilities.nowInSeconds();
+ ReadQuery query = getQuery(options, nowInSec);
int pageSize = getPageSize(options);
- if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
+ try (ReadOrderGroup orderGroup = query.startOrderGroup())
{
- List<Row> rows = command == null
- ? Collections.<Row>emptyList()
- : (command instanceof Pageable.ReadCommands
- ? readLocally(keyspace(), ((Pageable.ReadCommands)command).commands)
- : ((RangeSliceCommand)command).executeLocally());
-
- return processResults(rows, options, limit, now);
+ if (pageSize <= 0 || query.limits().count() <= pageSize)
+ {
+ try (PartitionIterator data = query.executeInternal(orderGroup))
+ {
+ return processResults(data, options, nowInSec);
+ }
+ }
+ else
+ {
+ QueryPager pager = query.getPager(options.getPagingState());
+ return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec);
+ }
}
-
- QueryPager pager = QueryPagers.localPager(command);
- return execute(pager, options, limit, now, pageSize);
}
- public ResultSet process(List<Row> rows) throws InvalidRequestException
+ public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
{
- QueryOptions options = QueryOptions.DEFAULT;
- return process(rows, options, getLimit(options), System.currentTimeMillis());
+ return process(partitions, QueryOptions.DEFAULT, nowInSec);
}
public String keyspace()
@@ -326,372 +391,239 @@ public class SelectStatement implements CQLStatement
return restrictions;
}
- private List<ReadCommand> getSliceCommands(QueryOptions options, int limit, long now) throws RequestValidationException
+ private ReadQuery getSliceCommands(QueryOptions options, DataLimits limit, int nowInSec) throws RequestValidationException
{
Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
+ if (keys.isEmpty())
+ return ReadQuery.EMPTY;
- List<ReadCommand> commands = new ArrayList<>(keys.size());
-
- IDiskAtomFilter filter = makeFilter(options, limit);
+ ClusteringIndexFilter filter = makeClusteringIndexFilter(options);
if (filter == null)
- return null;
+ return ReadQuery.EMPTY;
+
+ RowFilter rowFilter = getRowFilter(options);
// Note that we use the total limit for every key, which is potentially inefficient.
// However, IN + LIMIT is not a very sensible choice.
+ List<SinglePartitionReadCommand<?>> commands = new ArrayList<>(keys.size());
for (ByteBuffer key : keys)
{
QueryProcessor.validateKey(key);
- // We should not share the slice filter amongst the commands (hence the cloneShallow), due to
- // SliceQueryFilter not being immutable due to its columnCounter used by the lastCounted() method
- // (this is fairly ugly and we should change that but that's probably not a tiny refactor to do that cleanly)
- commands.add(ReadCommand.create(keyspace(), ByteBufferUtil.clone(key), columnFamily(), now, filter.cloneShallow()));
+ DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBufferUtil.clone(key));
+ commands.add(SinglePartitionReadCommand.create(cfm, nowInSec, queriedColumns, rowFilter, limit, dk, filter));
}
- return commands;
+ return new SinglePartitionReadCommand.Group(commands, limit);
}
- private RangeSliceCommand getRangeCommand(QueryOptions options, int limit, long now) throws RequestValidationException
+ private ReadQuery getRangeCommand(QueryOptions options, DataLimits limit, int nowInSec) throws RequestValidationException
{
- IDiskAtomFilter filter = makeFilter(options, limit);
- if (filter == null)
- return null;
+ ClusteringIndexFilter clusteringIndexFilter = makeClusteringIndexFilter(options);
+ if (clusteringIndexFilter == null)
+ return ReadQuery.EMPTY;
- List<IndexExpression> expressions = getValidatedIndexExpressions(options);
+ RowFilter rowFilter = getRowFilter(options);
// The LIMIT provided by the user is the number of CQL row he wants returned.
// We want to have getRangeSlice to count the number of columns, not the number of keys.
- AbstractBounds<RowPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
+ AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
return keyBounds == null
- ? null
- : new RangeSliceCommand(keyspace(), columnFamily(), now, filter, keyBounds, expressions, limit, !parameters.isDistinct, false);
- }
-
- private ColumnSlice makeStaticSlice()
- {
- // Note: we could use staticPrefix.start() for the start bound, but EMPTY gives us the
- // same effect while saving a few CPU cycles.
- return isReversed
- ? new ColumnSlice(cfm.comparator.staticPrefix().end(), Composites.EMPTY)
- : new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end());
+ ? ReadQuery.EMPTY
+ : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
}
- private IDiskAtomFilter makeFilter(QueryOptions options, int limit)
+ private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions options)
throws InvalidRequestException
{
- int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size();
if (parameters.isDistinct)
{
- // For distinct, we only care about fetching the beginning of each partition. If we don't have
- // static columns, we in fact only care about the first cell, so we query only that (we don't "group").
- // If we do have static columns, we do need to fetch the first full group (to have the static columns values).
-
- // See the comments on IGNORE_TOMBSTONED_PARTITIONS and CASSANDRA-8490 for why we use a special value for
- // DISTINCT queries on the partition key only.
- toGroup = selection.containsStaticColumns() ? toGroup : SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
- return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1, toGroup);
+ // We need to be able to distinguish between partition having live rows and those that don't. But
+ // doing so is not trivial since "having a live row" depends potentially on
+ // 1) when the query is performed, due to TTLs
+ // 2) how thing reconcile together between different nodes
+ // so that it's hard to really optimize properly internally. So to keep it simple, we simply query
+ // for the first row of the partition and hence uses Slices.ALL. We'll limit it to the first live
+ // row however in getLimit().
+ return new ClusteringIndexSliceFilter(Slices.ALL, false);
}
- else if (restrictions.isColumnRange())
- {
- List<Composite> startBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.START, options);
- List<Composite> endBounds = restrictions.getClusteringColumnsBoundsAsComposites(Bound.END, options);
- assert startBounds.size() == endBounds.size();
-
- // Handles fetching static columns. Note that for 2i, the filter is just used to restrict
- // the part of the index to query so adding the static slice would be useless and confusing.
- // For 2i, static columns are retrieve in CompositesSearcher with each index hit.
- ColumnSlice staticSlice = selection.containsStaticColumns() && !restrictions.usesSecondaryIndexing()
- ? makeStaticSlice()
- : null;
-
- // The case where startBounds == 1 is common enough that it's worth optimizing
- if (startBounds.size() == 1)
- {
- ColumnSlice slice = new ColumnSlice(startBounds.get(0), endBounds.get(0));
- if (slice.isAlwaysEmpty(cfm.comparator, isReversed))
- return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup);
-
- if (staticSlice == null)
- return sliceFilter(slice, limit, toGroup);
-
- if (isReversed)
- return slice.includes(cfm.comparator.reverseComparator(), staticSlice.start)
- ? sliceFilter(new ColumnSlice(slice.start, staticSlice.finish), limit, toGroup)
- : sliceFilter(new ColumnSlice[]{ slice, staticSlice }, limit, toGroup);
- else
- return slice.includes(cfm.comparator, staticSlice.finish)
- ? sliceFilter(new ColumnSlice(staticSlice.start, slice.finish), limit, toGroup)
- : sliceFilter(new ColumnSlice[]{ staticSlice, slice }, limit, toGroup);
- }
-
- List<ColumnSlice> l = new ArrayList<ColumnSlice>(startBounds.size());
- for (int i = 0; i < startBounds.size(); i++)
- {
- ColumnSlice slice = new ColumnSlice(startBounds.get(i), endBounds.get(i));
- if (!slice.isAlwaysEmpty(cfm.comparator, isReversed))
- l.add(slice);
- }
- if (l.isEmpty())
- return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup);
- if (staticSlice == null)
- return sliceFilter(l.toArray(new ColumnSlice[l.size()]), limit, toGroup);
+ if (restrictions.isColumnRange())
+ {
+ Slices slices = makeSlices(options);
+ if (slices == Slices.NONE && !selection.containsStaticColumns())
+ return null;
- // The slices should not overlap. We know the slices built from startBounds/endBounds don't, but if there is
- // a static slice, it could overlap with the 2nd slice. Check for it and correct if that's the case
- ColumnSlice[] slices;
- if (isReversed)
- {
- if (l.get(l.size() - 1).includes(cfm.comparator.reverseComparator(), staticSlice.start))
- {
- slices = l.toArray(new ColumnSlice[l.size()]);
- slices[slices.length-1] = new ColumnSlice(slices[slices.length-1].start, Composites.EMPTY);
- }
- else
- {
- slices = l.toArray(new ColumnSlice[l.size()+1]);
- slices[slices.length-1] = staticSlice;
- }
- }
- else
- {
- if (l.get(0).includes(cfm.comparator, staticSlice.finish))
- {
- slices = new ColumnSlice[l.size()];
- slices[0] = new ColumnSlice(Composites.EMPTY, l.get(0).finish);
- for (int i = 1; i < l.size(); i++)
- slices[i] = l.get(i);
- }
- else
- {
- slices = new ColumnSlice[l.size()+1];
- slices[0] = staticSlice;
- for (int i = 0; i < l.size(); i++)
- slices[i+1] = l.get(i);
- }
- }
- return sliceFilter(slices, limit, toGroup);
+ return new ClusteringIndexSliceFilter(slices, isReversed);
}
else
{
- SortedSet<CellName> cellNames = getRequestedColumns(options);
- if (cellNames == null) // in case of IN () for the last column of the key
+ NavigableSet<Clustering> clusterings = getRequestedRows(options);
+ if (clusterings.isEmpty() && !selection.containsStaticColumns()) // in case of IN () for the last column of the key
return null;
- QueryProcessor.validateCellNames(cellNames, cfm.comparator);
- return new NamesQueryFilter(cellNames, true);
+
+ return new ClusteringIndexNamesFilter(clusterings, isReversed);
}
}
- private SliceQueryFilter sliceFilter(ColumnSlice slice, int limit, int toGroup)
+ private Slices makeSlices(QueryOptions options)
+ throws InvalidRequestException
{
- return sliceFilter(new ColumnSlice[]{ slice }, limit, toGroup);
- }
+ SortedSet<Slice.Bound> startBounds = restrictions.getClusteringColumnsBounds(Bound.START, options);
+ SortedSet<Slice.Bound> endBounds = restrictions.getClusteringColumnsBounds(Bound.END, options);
+ assert startBounds.size() == endBounds.size();
- private SliceQueryFilter sliceFilter(ColumnSlice[] slices, int limit, int toGroup)
- {
- assert ColumnSlice.validateSlices(slices, cfm.comparator, isReversed) : String.format("Invalid slices: " + Arrays.toString(slices) + (isReversed ? " (reversed)" : ""));
- return new SliceQueryFilter(slices, isReversed, limit, toGroup);
+ // The case where startBounds == 1 is common enough that it's worth optimizing
+ if (startBounds.size() == 1)
+ {
+ Slice.Bound start = startBounds.first();
+ Slice.Bound end = endBounds.first();
+ return cfm.comparator.compare(start, end) > 0
+ ? Slices.NONE
+ : Slices.with(cfm.comparator, Slice.make(start, end));
+ }
+
+ Slices.Builder builder = new Slices.Builder(cfm.comparator, startBounds.size());
+ Iterator<Slice.Bound> startIter = startBounds.iterator();
+ Iterator<Slice.Bound> endIter = endBounds.iterator();
+ while (startIter.hasNext() && endIter.hasNext())
+ {
+ Slice.Bound start = startIter.next();
+ Slice.Bound end = endIter.next();
+
+ // Ignore slices that are nonsensical
+ if (cfm.comparator.compare(start, end) > 0)
+ continue;
+
+ builder.add(start, end);
+ }
+
+ return builder.build();
}
/**
* May be used by custom QueryHandler implementations
*/
- public int getLimit(QueryOptions options) throws InvalidRequestException
+ public DataLimits getLimit(QueryOptions options) throws InvalidRequestException
{
- if (limit != null)
+ int userLimit = -1;
+ // If we aggregate, the limit really apply to the number of rows returned to the user, not to what is queried, and
+ // since in practice we currently only aggregate at top level (we have no GROUP BY support yet), we'll only ever
+ // return 1 result and can therefore basically ignore the user LIMIT in this case.
+ // Whenever we support GROUP BY, we'll have to add a new DataLimits kind that knows how things are grouped and is thus
+ // able to apply the user limit properly.
+ if (limit != null && !selection.isAggregate())
{
ByteBuffer b = checkNotNull(limit.bindAndGet(options), "Invalid null value of limit");
// treat UNSET limit value as 'unlimited'
- if (b == UNSET_BYTE_BUFFER)
- return Integer.MAX_VALUE;
- try
- {
- Int32Type.instance.validate(b);
- int l = Int32Type.instance.compose(b);
- checkTrue(l > 0, "LIMIT must be strictly positive");
- return l;
- }
- catch (MarshalException e)
+ if (b != UNSET_BYTE_BUFFER)
{
- throw new InvalidRequestException("Invalid limit value");
+ try
+ {
+ Int32Type.instance.validate(b);
+ userLimit = Int32Type.instance.compose(b);
+ checkTrue(userLimit > 0, "LIMIT must be strictly positive");
+ }
+ catch (MarshalException e)
+ {
+ throw new InvalidRequestException("Invalid limit value");
+ }
}
}
- return Integer.MAX_VALUE;
- }
- private int updateLimitForQuery(int limit)
- {
- // Internally, we don't support exclusive bounds for slices. Instead, we query one more element if necessary
- // and exclude it later (in processColumnFamily)
- return restrictions.isNonCompositeSliceWithExclusiveBounds() && limit != Integer.MAX_VALUE
- ? limit + 1
- : limit;
+ if (parameters.isDistinct)
+ return userLimit < 0 ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(userLimit);
+
+ return userLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(userLimit);
}
- private SortedSet<CellName> getRequestedColumns(QueryOptions options) throws InvalidRequestException
+ private NavigableSet<Clustering> getRequestedRows(QueryOptions options) throws InvalidRequestException
{
// Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762
// we always do a slice for CQL3 tables, so it's ok to ignore them here
assert !restrictions.isColumnRange();
- SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator);
- for (Composite composite : restrictions.getClusteringColumnsAsComposites(options))
- columns.addAll(addSelectedColumns(composite));
- return columns;
- }
-
- private SortedSet<CellName> addSelectedColumns(Composite prefix)
- {
- if (cfm.comparator.isDense())
- {
- return FBUtilities.singleton(cfm.comparator.create(prefix, null), cfm.comparator);
- }
- else
- {
- SortedSet<CellName> columns = new TreeSet<CellName>(cfm.comparator);
-
- // We need to query the selected column as well as the marker
- // column (for the case where the row exists but has no columns outside the PK)
- // Two exceptions are "static CF" (non-composite non-compact CF) and "super CF"
- // that don't have marker and for which we must query all columns instead
- if (cfm.comparator.isCompound() && !cfm.isSuper())
- {
- // marker
- columns.add(cfm.comparator.rowMarker(prefix));
-
- // selected columns
- for (ColumnDefinition def : selection.getColumns())
- if (def.isRegular() || def.isStatic())
- columns.add(cfm.comparator.create(prefix, def));
- }
- else
- {
- // We now that we're not composite so we can ignore static columns
- for (ColumnDefinition def : cfm.regularColumns())
- columns.add(cfm.comparator.create(prefix, def));
- }
- return columns;
- }
+ return restrictions.getClusteringColumns(options);
}
/**
* May be used by custom QueryHandler implementations
*/
- public List<IndexExpression> getValidatedIndexExpressions(QueryOptions options) throws InvalidRequestException
+ public RowFilter getRowFilter(QueryOptions options) throws InvalidRequestException
{
- if (!restrictions.usesSecondaryIndexing())
- return Collections.emptyList();
-
ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
-
- List<IndexExpression> expressions = restrictions.getIndexExpressions(secondaryIndexManager, options);
-
- secondaryIndexManager.validateIndexSearchersForQuery(expressions);
-
- return expressions;
- }
-
- private CellName makeExclusiveSliceBound(Bound bound, CellNameType type, QueryOptions options) throws InvalidRequestException
- {
- if (restrictions.areRequestedBoundsInclusive(bound))
- return null;
-
- return type.makeCellName(restrictions.getClusteringColumnsBounds(bound, options).get(0));
+ RowFilter filter = restrictions.getRowFilter(secondaryIndexManager, options);
+ secondaryIndexManager.validateFilter(filter);
+ return filter;
}
- private Iterator<Cell> applySliceRestriction(final Iterator<Cell> cells, final QueryOptions options) throws InvalidRequestException
+ private ResultSet process(PartitionIterator partitions, QueryOptions options, int nowInSec) throws InvalidRequestException
{
- final CellNameType type = cfm.comparator;
-
- final CellName excludedStart = makeExclusiveSliceBound(Bound.START, type, options);
- final CellName excludedEnd = makeExclusiveSliceBound(Bound.END, type, options);
-
- return Iterators.filter(cells, new Predicate<Cell>()
+ Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson);
+ while (partitions.hasNext())
{
- public boolean apply(Cell c)
+ try (RowIterator partition = partitions.next())
{
- // For dynamic CF, the column could be out of the requested bounds (because we don't support strict bounds internally (unless
- // the comparator is composite that is)), filter here
- return !((excludedStart != null && type.compare(c.name(), excludedStart) == 0)
- || (excludedEnd != null && type.compare(c.name(), excludedEnd) == 0));
+ processPartition(partition, options, result, nowInSec);
}
- });
- }
-
- private ResultSet process(List<Row> rows, QueryOptions options, int limit, long now) throws InvalidRequestException
- {
- Selection.ResultSetBuilder result = selection.resultSetBuilder(now, parameters.isJson);
- for (org.apache.cassandra.db.Row row : rows)
- {
- // Not columns match the query, skip
- if (row.cf == null)
- continue;
-
- processColumnFamily(row.key.getKey(), row.cf, options, now, result);
}
ResultSet cqlRows = result.build(options.getProtocolVersion());
orderResults(cqlRows);
- // Internal calls always return columns in the comparator order, even when reverse was set
- if (isReversed)
- cqlRows.reverse();
-
- // Trim result if needed to respect the user limit
- cqlRows.trim(limit);
return cqlRows;
}
- // Used by ModificationStatement for CAS operations
- void processColumnFamily(ByteBuffer key, ColumnFamily cf, QueryOptions options, long now, Selection.ResultSetBuilder result)
- throws InvalidRequestException
+ public static ByteBuffer[] getComponents(CFMetaData cfm, DecoratedKey dk)
{
- CFMetaData cfm = cf.metadata();
- ByteBuffer[] keyComponents = null;
+ ByteBuffer key = dk.getKey();
if (cfm.getKeyValidator() instanceof CompositeType)
{
- keyComponents = ((CompositeType)cfm.getKeyValidator()).split(key);
+ return ((CompositeType)cfm.getKeyValidator()).split(key);
}
else
{
- keyComponents = new ByteBuffer[]{ key };
+ return new ByteBuffer[]{ key };
}
+ }
- Iterator<Cell> cells = cf.getSortedColumns().iterator();
- if (restrictions.isNonCompositeSliceWithExclusiveBounds())
- cells = applySliceRestriction(cells, options);
-
+ // Used by ModificationStatement for CAS operations
+ void processPartition(RowIterator partition, QueryOptions options, Selection.ResultSetBuilder result, int nowInSec)
+ throws InvalidRequestException
+ {
int protocolVersion = options.getProtocolVersion();
- CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells);
- // If there is static columns but there is no non-static row, then provided the select was a full
- // partition selection (i.e. not a 2ndary index search and there was no condition on clustering columns)
- // then we want to include the static columns in the result set (and we're done).
- CQL3Row staticRow = iter.getStaticRow();
- if (staticRow != null && !iter.hasNext() && !restrictions.usesSecondaryIndexing() && restrictions.hasNoClusteringColumnsRestriction())
+ ByteBuffer[] keyComponents = getComponents(cfm, partition.partitionKey());
+
+ Row staticRow = partition.staticRow().takeAlias();
+ // If there is no rows, then provided the select was a full partition selection
+ // (i.e. not a 2ndary index search and there was no condition on clustering columns),
+ // we want to include static columns and we're done.
+ if (!partition.hasNext())
{
- result.newRow(protocolVersion);
- for (ColumnDefinition def : selection.getColumns())
+ if (!staticRow.isEmpty() && (!restrictions.usesSecondaryIndexing() || cfm.isStaticCompactTable()) && !restrictions.hasClusteringColumnsRestriction())
{
- switch (def.kind)
+ result.newRow(protocolVersion);
+ for (ColumnDefinition def : selection.getColumns())
{
- case PARTITION_KEY:
- result.add(keyComponents[def.position()]);
- break;
- case STATIC:
- addValue(result, def, staticRow, options);
- break;
- default:
- result.add((ByteBuffer)null);
+ switch (def.kind)
+ {
+ case PARTITION_KEY:
+ result.add(keyComponents[def.position()]);
+ break;
+ case STATIC:
+ addValue(result, def, staticRow, nowInSec, protocolVersion);
+ break;
+ default:
+ result.add((ByteBuffer)null);
+ }
}
}
return;
}
- while (iter.hasNext())
+ while (partition.hasNext())
{
- CQL3Row cql3Row = iter.next();
-
- // Respect requested order
+ Row row = partition.next();
result.newRow(protocolVersion);
// Respect selection order
for (ColumnDefinition def : selection.getColumns())
@@ -702,41 +634,35 @@ public class SelectStatement implements CQLStatement
result.add(keyComponents[def.position()]);
break;
case CLUSTERING_COLUMN:
- result.add(cql3Row.getClusteringColumn(def.position()));
- break;
- case COMPACT_VALUE:
- result.add(cql3Row.getColumn(null));
+ result.add(row.clustering().get(def.position()));
break;
case REGULAR:
- addValue(result, def, cql3Row, options);
+ addValue(result, def, row, nowInSec, protocolVersion);
break;
case STATIC:
- addValue(result, def, staticRow, options);
+ addValue(result, def, staticRow, nowInSec, protocolVersion);
break;
}
}
}
}
- private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, CQL3Row row, QueryOptions options)
+ private static void addValue(Selection.ResultSetBuilder result, ColumnDefinition def, Row row, int nowInSec, int protocolVersion)
{
- if (row == null)
+ if (def.isComplex())
{
- result.add((ByteBuffer)null);
- return;
+ // Collections are the only complex types we have so far
+ assert def.type.isCollection() && def.type.isMultiCell();
+ Iterator<Cell> cells = row.getCells(def);
+ if (cells == null)
+ result.add((ByteBuffer)null);
+ else
+ result.add(((CollectionType)def.type).serializeForNativeProtocol(def, cells, protocolVersion));
}
-
- if (def.type.isMultiCell())
+ else
{
- List<Cell> cells = row.getMultiCellColumn(def.name);
- ByteBuffer buffer = cells == null
- ? null
- : ((CollectionType)def.type).serializeForNativeProtocol(def, cells, options.getProtocolVersion());
- result.add(buffer);
- return;
+ result.add(row.getCell(def), nowInSec);
}
-
- result.add(row.getColumn(def.name));
}
private boolean needsPostQueryOrdering()
@@ -796,9 +722,6 @@ public class SelectStatement implements CQLStatement
isReversed = isReversed(cfm);
}
- if (isReversed)
- restrictions.reverse();
-
checkNeedsFiltering(restrictions);
SelectStatement stmt = new SelectStatement(cfm,
@@ -832,7 +755,8 @@ public class SelectStatement implements CQLStatement
whereClause,
boundNames,
selection.containsOnlyStaticColumns(),
- selection.containsACollection());
+ selection.containsACollection(),
+ parameters.allowFiltering);
}
catch (UnrecognizedEntityException e)
{
@@ -980,47 +904,12 @@ public class SelectStatement implements CQLStatement
{
// We will potentially filter data if either:
// - Have more than one IndexExpression
- // - Have no index expression and the column filter is not the identity
+ // - Have no index expression and the row filter is not the identity
checkFalse(restrictions.needFiltering(),
"Cannot execute this query as it might involve data filtering and " +
"thus may have unpredictable performance. If you want to execute " +
"this query despite the performance unpredictability, use ALLOW FILTERING");
}
-
- // We don't internally support exclusive slice bounds on non-composite tables. To deal with it we do an
- // inclusive slice and remove post-query the value that shouldn't be returned. One problem however is that
- // if there is a user limit, that limit may make the query return before the end of the slice is reached,
- // in which case, once we'll have removed bound post-query, we might end up with less results than
- // requested which would be incorrect. For single-partition query, this is not a problem, we just ask for
- // one more result (see updateLimitForQuery()) since that's enough to compensate for that problem. For key
- // range however, each returned row may include one result that will have to be trimmed, so we would have
- // to bump the query limit by N where N is the number of rows we will return, but we don't know that in
- // advance. So, since we currently don't have a good way to handle such query, we refuse it (#7059) rather
- // than answering with something that is wrong.
- if (restrictions.isNonCompositeSliceWithExclusiveBounds() && restrictions.isKeyRange() && limit != null)
- {
- SingleColumnRelation rel = findInclusiveClusteringRelationForCompact(restrictions.cfm);
- throw invalidRequest("The query requests a restriction of rows with a strict bound (%s) over a range of partitions. "
- + "This is not supported by the underlying storage engine for COMPACT tables if a LIMIT is provided. "
- + "Please either make the condition non strict (%s) or remove the user LIMIT", rel, rel.withNonStrictOperator());
- }
- }
-
- private SingleColumnRelation findInclusiveClusteringRelationForCompact(CFMetaData cfm)
- {
- for (Relation r : whereClause)
- {
- // We only call this when sliceRestriction != null, i.e. for compact table with non composite comparator,
- // so it can't be a MultiColumnRelation.
- SingleColumnRelation rel = (SingleColumnRelation)r;
-
- if (cfm.getColumnDefinition(rel.getEntity().prepare(cfm)).isClusteringColumn()
- && (rel.operator() == Operator.GT || rel.operator() == Operator.LT))
- return rel;
- }
-
- // We're not supposed to call this method unless we know this can't happen
- throw new AssertionError();
}
private boolean containsAlias(final ColumnIdentifier name)