You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/02/20 10:15:16 UTC
[1/2] Add static columns in CQL3
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 b2b3055c7 -> b09d87691
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 4852cf7..cd5f2a2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -31,9 +31,9 @@ import org.apache.cassandra.utils.Pair;
*/
public class DeleteStatement extends ModificationStatement
{
- private DeleteStatement(CFMetaData cfm, Attributes attrs)
+ private DeleteStatement(StatementType type, CFMetaData cfm, Attributes attrs)
{
- super(cfm, attrs);
+ super(type, cfm, attrs);
}
public boolean requireFullClusteringKey()
@@ -44,15 +44,30 @@ public class DeleteStatement extends ModificationStatement
public ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
throws InvalidRequestException
{
- CFDefinition cfDef = cfm.getCfDef();
ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
+ addUpdateForKey(cf, key, builder, params);
+ return cf;
+ }
+
+ public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params)
+ throws InvalidRequestException
+ {
+ CFDefinition cfDef = cfm.getCfDef();
List<Operation> deletions = getOperations();
- boolean fullKey = builder.componentCount() == cfDef.columns.size();
+ boolean fullKey = builder.componentCount() == cfDef.clusteringColumnsCount();
boolean isRange = cfDef.isCompact ? !fullKey : (!fullKey || deletions.isEmpty());
if (!deletions.isEmpty() && isRange)
- throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", getFirstEmptyKey(), deletions.get(0).columnName));
+ {
+ // We only get there if we have at least one non-static columns selected, as otherwise the builder will be
+ // the "static" builder and isRange will be false. But we may still have static columns, so pick the first
+ // non static one for the error message so it's not confusing
+ for (Operation deletion : deletions)
+ if (cfm.getCfDef().get(deletion.columnName).kind != CFDefinition.Name.Kind.STATIC)
+ throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", getFirstEmptyKey(), deletion.columnName));
+ throw new AssertionError();
+ }
if (deletions.isEmpty() && builder.componentCount() == 0)
{
@@ -83,8 +98,6 @@ public class DeleteStatement extends ModificationStatement
}
}
}
-
- return cf;
}
public static class Parsed extends ModificationStatement.Parsed
@@ -96,7 +109,7 @@ public class DeleteStatement extends ModificationStatement
Attributes.Raw attrs,
List<Operation.RawDeletion> deletions,
List<Relation> whereClause,
- List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions)
+ List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions)
{
super(name, attrs, conditions, false);
this.deletions = deletions;
@@ -105,7 +118,7 @@ public class DeleteStatement extends ModificationStatement
protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
{
- DeleteStatement stmt = new DeleteStatement(cfDef.cfm, attrs);
+ DeleteStatement stmt = new DeleteStatement(ModificationStatement.StatementType.DELETE, cfDef.cfm, attrs);
for (Operation.RawDeletion deletion : deletions)
{
@@ -115,7 +128,7 @@ public class DeleteStatement extends ModificationStatement
// For compact, we only have one value except the key, so the only form of DELETE that make sense is without a column
// list. However, we support having the value name for coherence with the static/sparse case
- if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
+ if (name.isPrimaryKeyColumn())
throw new InvalidRequestException(String.format("Invalid identifier %s for deletion (should not be a PRIMARY KEY part)", name));
Operation op = deletion.prepare(name);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 676286c..ac8d2e1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -20,12 +20,15 @@ package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
import org.github.jamm.MemoryMeter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnSlice;
@@ -56,6 +59,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
private static boolean loggedCounterTTL = false;
private static boolean loggedCounterTimestamp = false;
+ public static enum StatementType { INSERT, UPDATE, DELETE }
+ public final StatementType type;
+
public final CFMetaData cfm;
public final Attributes attrs;
@@ -63,11 +69,25 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
private final List<Operation> columnOperations = new ArrayList<Operation>();
private int boundTerms;
- private List<Operation> columnConditions;
+ // Separating normal and static conditions makes things somewhat easier
+ private List<ColumnCondition> columnConditions;
+ private List<ColumnCondition> staticConditions;
private boolean ifNotExists;
- public ModificationStatement(CFMetaData cfm, Attributes attrs)
+ private boolean hasNoClusteringColumns = true;
+ private boolean setsOnlyStaticColumns;
+
+ private final Function<ColumnCondition, ColumnIdentifier> getColumnForCondition = new Function<ColumnCondition, ColumnIdentifier>()
{
+ public ColumnIdentifier apply(ColumnCondition cond)
+ {
+ return cond.column.name;
+ }
+ };
+
+ public ModificationStatement(StatementType type, CFMetaData cfm, Attributes attrs)
+ {
+ this.type = type;
this.cfm = cfm;
this.attrs = attrs;
}
@@ -78,11 +98,12 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
+ meter.measureDeep(attrs)
+ meter.measureDeep(processedKeys)
+ meter.measureDeep(columnOperations)
- + (columnConditions == null ? 0 : meter.measureDeep(columnConditions));
+ + (columnConditions == null ? 0 : meter.measureDeep(columnConditions))
+ + (staticConditions == null ? 0 : meter.measureDeep(staticConditions));
}
public abstract boolean requireFullClusteringKey();
- public abstract ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException;
+ public abstract void addUpdateForKey(ColumnFamily updates, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException;
public int getBoundTerms()
{
@@ -155,6 +176,15 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
public void addOperation(Operation op)
{
+ if (op.isStatic(cfm))
+ {
+ if (columnOperations.isEmpty())
+ setsOnlyStaticColumns = true;
+ }
+ else
+ {
+ setsOnlyStaticColumns = false;
+ }
columnOperations.add(op);
}
@@ -163,12 +193,31 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return columnOperations;
}
- public void addCondition(Operation op)
+ public Iterable<ColumnIdentifier> getColumnsWithConditions()
{
- if (columnConditions == null)
- columnConditions = new ArrayList<Operation>();
+ if (ifNotExists)
+ return null;
+
+ return Iterables.concat(columnConditions == null ? Collections.<ColumnIdentifier>emptyList() : Iterables.transform(columnConditions, getColumnForCondition),
+ staticConditions == null ? Collections.<ColumnIdentifier>emptyList() : Iterables.transform(staticConditions, getColumnForCondition));
+ }
- columnConditions.add(op);
+ public void addCondition(ColumnCondition cond) throws InvalidRequestException
+ {
+ List<ColumnCondition> conds = null;
+ if (cond.column.kind == CFDefinition.Name.Kind.STATIC)
+ {
+ if (staticConditions == null)
+ staticConditions = new ArrayList<ColumnCondition>();
+ conds = staticConditions;
+ }
+ else
+ {
+ if (columnConditions == null)
+ columnConditions = new ArrayList<ColumnCondition>();
+ conds = columnConditions;
+ }
+ conds.add(cond);
}
public void setIfNotExistCondition()
@@ -176,13 +225,20 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
ifNotExists = true;
}
- private void addKeyValues(ColumnIdentifier name, Restriction values) throws InvalidRequestException
+ public boolean hasIfNotExistCondition()
{
- if (processedKeys.put(name, values) != null)
- throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name));
+ return ifNotExists;
}
- public void addKeyValue(ColumnIdentifier name, Term value) throws InvalidRequestException
+ private void addKeyValues(CFDefinition.Name name, Restriction values) throws InvalidRequestException
+ {
+ if (name.kind == CFDefinition.Name.Kind.COLUMN_ALIAS)
+ hasNoClusteringColumns = false;
+ if (processedKeys.put(name.name, values) != null)
+ throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name.name));
+ }
+
+ public void addKeyValue(CFDefinition.Name name, Term value) throws InvalidRequestException
{
addKeyValues(name, new Restriction.EQ(value, false));
}
@@ -233,10 +289,11 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), name));
}
- addKeyValues(name.name, restriction);
+ addKeyValues(name, restriction);
break;
case VALUE_ALIAS:
case COLUMN_METADATA:
+ case STATIC:
throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", name));
}
}
@@ -248,7 +305,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
CFDefinition cfDef = cfm.getCfDef();
ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder();
List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
- for (CFDefinition.Name name : cfDef.keys.values())
+ for (CFDefinition.Name name : cfDef.partitionKeys())
{
Restriction r = processedKeys.get(name.name);
if (r == null)
@@ -262,7 +319,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
{
if (val == null)
throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
- keys.add(keyBuilder.copy().add(val).build());
+ ByteBuffer key = keyBuilder.copy().add(val).build();
+ ThriftValidation.validateKey(cfm, key);
+ keys.add(key);
}
}
else
@@ -281,10 +340,46 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
throws InvalidRequestException
{
+ // If the only updated/deleted columns are static, then we don't need clustering columns.
+ // And in fact, unless it is an INSERT, we reject if clustering colums are provided as that
+ // suggest something unintended. For instance, given:
+ // CREATE TABLE t (k int, v int, s int static, PRIMARY KEY (k, v))
+ // it can make sense to do:
+ // INSERT INTO t(k, v, s) VALUES (0, 1, 2)
+ // but both
+ // UPDATE t SET s = 3 WHERE k = 0 AND v = 1
+ // DELETE v FROM t WHERE k = 0 AND v = 1
+ // sounds like you don't really understand what your are doing.
+ if (setsOnlyStaticColumns && columnConditions == null && (type != StatementType.INSERT || hasNoClusteringColumns))
+ {
+ // Reject if any clustering columns is set
+ for (CFDefinition.Name name : cfm.getCfDef().clusteringColumns())
+ if (processedKeys.get(name.name) != null)
+ throw new InvalidRequestException(String.format("Invalid restriction on clustering column %s since the %s statement modifies only static columns", name.name, type));
+ return cfm.getStaticColumnNameBuilder();
+ }
+
+ return createClusteringPrefixBuilderInternal(variables);
+ }
+
+ private ColumnNameBuilder updatePrefixFor(ByteBuffer name, ColumnNameBuilder prefix)
+ {
+ return isStatic(name) ? cfm.getStaticColumnNameBuilder() : prefix;
+ }
+
+ public boolean isStatic(ByteBuffer name)
+ {
+ ColumnDefinition def = cfm.getColumnDefinition(name);
+ return def != null && def.type == ColumnDefinition.Type.STATIC;
+ }
+
+ private ColumnNameBuilder createClusteringPrefixBuilderInternal(List<ByteBuffer> variables)
+ throws InvalidRequestException
+ {
CFDefinition cfDef = cfm.getCfDef();
ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
CFDefinition.Name firstEmptyKey = null;
- for (CFDefinition.Name name : cfDef.columns.values())
+ for (CFDefinition.Name name : cfDef.clusteringColumns())
{
Restriction r = processedKeys.get(name.name);
if (r == null)
@@ -312,7 +407,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
protected CFDefinition.Name getFirstEmptyKey()
{
- for (CFDefinition.Name name : cfm.getCfDef().columns.values())
+ for (CFDefinition.Name name : cfm.getCfDef().clusteringColumns())
{
if (processedKeys.get(name.name) == null)
return name;
@@ -354,8 +449,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
int i = 0;
for (ByteBuffer name : toRead)
{
- ByteBuffer start = clusteringPrefix.copy().add(name).build();
- ByteBuffer finish = clusteringPrefix.copy().add(name).buildAsEndOfRange();
+ ColumnNameBuilder prefix = updatePrefixFor(name, clusteringPrefix);
+ ByteBuffer start = prefix.copy().add(name).build();
+ ByteBuffer finish = prefix.copy().add(name).buildAsEndOfRange();
slices[i++] = new ColumnSlice(start, finish);
}
@@ -392,7 +488,9 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
public boolean hasConditions()
{
- return ifNotExists || (columnConditions != null && !columnConditions.isEmpty());
+ return ifNotExists
+ || (columnConditions != null && !columnConditions.isEmpty())
+ || (staticConditions != null && !staticConditions.isEmpty());
}
public ResultMessage execute(QueryState queryState, QueryOptions options)
@@ -434,20 +532,14 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (keys.size() > 1)
throw new InvalidRequestException("IN on the partition key is not supported with conditional updates");
- ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
-
ByteBuffer key = keys.get(0);
- ThriftValidation.validateKey(cfm, key);
-
- UpdateParameters updParams = new UpdateParameters(cfm, variables, queryState.getTimestamp(), getTimeToLive(variables), null);
- ColumnFamily updates = updateForKey(key, clusteringPrefix, updParams);
// It's cleaner to use the query timestamp below, but it's in seconds while the conditions expects microseconds, so just
// put it back in millis (we don't really lose precision because the ultimate consumer, Column.isLive, re-divide it).
- long now = queryState.getTimestamp() * 1000;
- CASConditions conditions = ifNotExists
- ? new NotExistCondition(clusteringPrefix, now)
- : new ColumnsConditions(clusteringPrefix, cfm, key, columnConditions, variables, now);
+ CQL3CasConditions conditions = new CQL3CasConditions(cfm, queryState.getTimestamp() * 1000);
+ ColumnNameBuilder prefix = createClusteringPrefixBuilder(variables);
+ ColumnFamily updates = UnsortedColumns.factory.create(cfm);
+ addUpdatesAndConditions(key, prefix, updates, conditions, variables, getTimestamp(queryState.getTimestamp(), variables));
ColumnFamily result = StorageProxy.cas(keyspace(),
columnFamily(),
@@ -459,16 +551,44 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return new ResultMessage.Rows(buildCasResultSet(key, result));
}
+ public void addUpdatesAndConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix, ColumnFamily updates, CQL3CasConditions conditions, List<ByteBuffer> variables, long now)
+ throws InvalidRequestException
+ {
+ UpdateParameters updParams = new UpdateParameters(cfm, variables, now, getTimeToLive(variables), null);
+ addUpdateForKey(updates, key, clusteringPrefix, updParams);
+
+ if (ifNotExists)
+ {
+ // If we use ifNotExists, if the statement applies to any non static columns, then the condition is on the row of the non-static
+ // columns and the prefix should be the rowPrefix. But if only static columns are set, then the ifNotExists apply to the existence
+ // of any static columns and we should use the prefix for the "static part" of the partition.
+ conditions.addNotExist(setsOnlyStaticColumns ? cfm.getStaticColumnNameBuilder() : clusteringPrefix);
+ }
+ else
+ {
+ if (columnConditions != null)
+ conditions.addConditions(clusteringPrefix, columnConditions, variables);
+ if (staticConditions != null)
+ conditions.addConditions(cfm.getStaticColumnNameBuilder(), staticConditions, variables);
+ }
+ }
+
private ResultSet buildCasResultSet(ByteBuffer key, ColumnFamily cf) throws InvalidRequestException
{
+ return buildCasResultSet(keyspace(), key, columnFamily(), cf, getColumnsWithConditions(), false);
+ }
+
+ public static ResultSet buildCasResultSet(String ksName, ByteBuffer key, String cfName, ColumnFamily cf, Iterable<ColumnIdentifier> columnsWithConditions, boolean isBatch)
+ throws InvalidRequestException
+ {
boolean success = cf == null;
- ColumnSpecification spec = new ColumnSpecification(keyspace(), columnFamily(), CAS_RESULT_COLUMN, BooleanType.instance);
+ ColumnSpecification spec = new ColumnSpecification(ksName, cfName, CAS_RESULT_COLUMN, BooleanType.instance);
ResultSet.Metadata metadata = new ResultSet.Metadata(Collections.singletonList(spec));
List<List<ByteBuffer>> rows = Collections.singletonList(Collections.singletonList(BooleanType.instance.decompose(success)));
ResultSet rs = new ResultSet(metadata, rows);
- return success ? rs : merge(rs, buildCasFailureResultSet(key, cf));
+ return success ? rs : merge(rs, buildCasFailureResultSet(key, cf, columnsWithConditions, isBatch));
}
private static ResultSet merge(ResultSet left, ResultSet right)
@@ -478,31 +598,44 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
else if (right.size() == 0)
return left;
- assert left.size() == 1 && right.size() == 1;
+ assert left.size() == 1;
int size = left.metadata.names.size() + right.metadata.names.size();
List<ColumnSpecification> specs = new ArrayList<ColumnSpecification>(size);
specs.addAll(left.metadata.names);
specs.addAll(right.metadata.names);
- List<ByteBuffer> row = new ArrayList<ByteBuffer>(size);
- row.addAll(left.rows.get(0));
- row.addAll(right.rows.get(0));
- return new ResultSet(new ResultSet.Metadata(specs), Collections.singletonList(row));
+ List<List<ByteBuffer>> rows = new ArrayList<>(right.size());
+ for (int i = 0; i < right.size(); i++)
+ {
+ List<ByteBuffer> row = new ArrayList<ByteBuffer>(size);
+ row.addAll(left.rows.get(0));
+ row.addAll(right.rows.get(i));
+ rows.add(row);
+ }
+ return new ResultSet(new ResultSet.Metadata(specs), rows);
}
- private ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf) throws InvalidRequestException
+ private static ResultSet buildCasFailureResultSet(ByteBuffer key, ColumnFamily cf, Iterable<ColumnIdentifier> columnsWithConditions, boolean isBatch)
+ throws InvalidRequestException
{
- CFDefinition cfDef = cfm.getCfDef();
+ CFDefinition cfDef = cf.metadata().getCfDef();
Selection selection;
- if (ifNotExists)
+ if (columnsWithConditions == null)
{
selection = Selection.wildcard(cfDef);
}
else
{
- List<CFDefinition.Name> names = new ArrayList<CFDefinition.Name>(columnConditions.size());
- for (Operation condition : columnConditions)
- names.add(cfDef.get(condition.columnName));
+ List<CFDefinition.Name> names = new ArrayList<CFDefinition.Name>();
+ // Adding the partition key for batches to disambiguate if the conditions span multipe rows (we don't add them outside
+ // of batches for compatibility sakes).
+ if (isBatch)
+ {
+ names.addAll(cfDef.partitionKeys());
+ names.addAll(cfDef.clusteringColumns());
+ }
+ for (ColumnIdentifier id : columnsWithConditions)
+ names.add(cfDef.get(id));
selection = Selection.forColumns(names);
}
@@ -548,7 +681,8 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
for (ByteBuffer key: keys)
{
ThriftValidation.validateKey(cfm, key);
- ColumnFamily cf = updateForKey(key, clusteringPrefix, params);
+ ColumnFamily cf = UnsortedColumns.factory.create(cfm);
+ addUpdateForKey(cf, key, clusteringPrefix, params);
mutations.add(makeMutation(key, cf, cl, isBatch));
}
return mutations;
@@ -570,104 +704,17 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return isCounter() ? new CounterMutation(rm, cl) : rm;
}
- private static abstract class CQL3CasConditions implements CASConditions
- {
- protected final ColumnNameBuilder rowPrefix;
- protected final long now;
-
- protected CQL3CasConditions(ColumnNameBuilder rowPrefix, long now)
- {
- this.rowPrefix = rowPrefix;
- this.now = now;
- }
-
- public IDiskAtomFilter readFilter()
- {
- // We always read the row entirely as on CAS failure we want to be able to distinguish between "row exists
- // but all values on why there were conditions are null" and "row doesn't exists", and we can't rely on the
- // row marker for that (see #6623)
- return new SliceQueryFilter(rowPrefix.build(), rowPrefix.buildAsEndOfRange(), false, 1, rowPrefix.componentCount());
- }
- }
-
- private static class NotExistCondition extends CQL3CasConditions
- {
- private NotExistCondition(ColumnNameBuilder rowPrefix, long now)
- {
- super(rowPrefix, now);
- }
-
- public boolean appliesTo(ColumnFamily current)
- {
- return current == null || current.hasOnlyTombstones(now);
- }
- }
-
- private static class ColumnsConditions extends CQL3CasConditions
- {
- private final ColumnFamily expected;
-
- private ColumnsConditions(ColumnNameBuilder rowPrefix,
- CFMetaData cfm,
- ByteBuffer key,
- Collection<Operation> conditions,
- List<ByteBuffer> variables,
- long now) throws InvalidRequestException
- {
- super(rowPrefix, now);
- this.expected = TreeMapBackedSortedColumns.factory.create(cfm);
-
- // When building the conditions, we should not use a TTL. It's not useful, and if a very low ttl (1 seconds) is used, it's possible
- // for it to expire before the actual build of the conditions which would break since we would then testing for the presence of tombstones.
- UpdateParameters params = new UpdateParameters(cfm, variables, now, 0, null);
-
- // Conditions
- for (Operation condition : conditions)
- condition.execute(key, expected, rowPrefix.copy(), params);
- }
-
- public boolean appliesTo(ColumnFamily current)
- {
- if (current == null)
- return false;
-
- for (Column e : expected)
- {
- Column c = current.getColumn(e.name());
- if (e.isLive(now))
- {
- if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
- return false;
- }
- else
- {
- // If we have a tombstone in expected, it means the condition tests that the column is
- // null, so check that we have no value
- if (c != null && c.isLive(now))
- return false;
- }
- }
- return true;
- }
-
- @Override
- public String toString()
- {
- return expected.toString();
- }
- }
-
public static abstract class Parsed extends CFStatement
{
protected final Attributes.Raw attrs;
- private final List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions;
+ private final List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions;
private final boolean ifNotExists;
- protected Parsed(CFName name, Attributes.Raw attrs, List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions, boolean ifNotExists)
+ protected Parsed(CFName name, Attributes.Raw attrs, List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions, boolean ifNotExists)
{
super(name);
this.attrs = attrs;
- this.conditions = conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions;
+ this.conditions = conditions == null ? Collections.<Pair<ColumnIdentifier, ColumnCondition.Raw>>emptyList() : conditions;
this.ifNotExists = ifNotExists;
}
@@ -709,24 +756,13 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
}
else
{
- for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : conditions)
+ for (Pair<ColumnIdentifier, ColumnCondition.Raw> entry : conditions)
{
CFDefinition.Name name = cfDef.get(entry.left);
if (name == null)
throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
- /*
- * Lists column names are based on a server-side generated timeuuid. So we can't allow lists
- * operation or that would yield unexpected results (update that should apply wouldn't). So for
- * now, we just refuse lists, which also save use from having to bother about the read that some
- * list operation involve.
- */
- if (name.type instanceof ListType)
- throw new InvalidRequestException(String.format("List operation (%s) are not allowed in conditional updates", name));
-
- Operation condition = entry.right.prepare(name);
- assert !condition.requiresRead();
-
+ ColumnCondition condition = entry.right.prepare(name);
condition.collectMarkerSpecification(boundNames);
switch (name.kind)
@@ -736,6 +772,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
case VALUE_ALIAS:
case COLUMN_METADATA:
+ case STATIC:
stmt.addCondition(condition);
break;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 52a7c70..2636c83 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -83,18 +83,57 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
private Map<CFDefinition.Name, Integer> orderingIndexes;
+ private boolean selectsStaticColumns;
+ private boolean selectsOnlyStaticColumns;
+
// Used by forSelection below
private static final Parameters defaultParameters = new Parameters(Collections.<ColumnIdentifier, Boolean>emptyMap(), false, false, null, false);
+ private static final Predicate<CFDefinition.Name> isStaticFilter = new Predicate<CFDefinition.Name>()
+ {
+ public boolean apply(CFDefinition.Name name)
+ {
+ return name.kind == CFDefinition.Name.Kind.STATIC;
+ }
+ };
+
public SelectStatement(CFDefinition cfDef, int boundTerms, Parameters parameters, Selection selection, Term limit)
{
this.cfDef = cfDef;
this.boundTerms = boundTerms;
this.selection = selection;
- this.keyRestrictions = new Restriction[cfDef.keys.size()];
- this.columnRestrictions = new Restriction[cfDef.columns.size()];
+ this.keyRestrictions = new Restriction[cfDef.partitionKeyCount()];
+ this.columnRestrictions = new Restriction[cfDef.clusteringColumnsCount()];
this.parameters = parameters;
this.limit = limit;
+
+ // Now gather a few info on whether we should bother with static columns or not for this statement
+ initStaticColumnsInfo();
+ }
+
+ private void initStaticColumnsInfo()
+ {
+ if (!cfDef.cfm.hasStaticColumns())
+ return;
+
+ // If it's a wildcard, we do select static but not only them
+ if (selection.isWildcard())
+ {
+ selectsStaticColumns = true;
+ return;
+ }
+
+ // Otherwise, check the selected columns
+ selectsStaticColumns = !Iterables.isEmpty(Iterables.filter(selection.getColumnsList(), isStaticFilter));
+ selectsOnlyStaticColumns = true;
+ for (CFDefinition.Name name : selection.getColumnsList())
+ {
+ if (name.kind != CFDefinition.Name.Kind.KEY_ALIAS && name.kind != CFDefinition.Name.Kind.STATIC)
+ {
+ selectsOnlyStaticColumns = false;
+ break;
+ }
+ }
}
// Creates a simple select based on the given selection.
@@ -383,35 +422,91 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
// to account for the grouping of columns.
// Since that doesn't work for maps/sets/lists, we now use the compositesToGroup option of SliceQueryFilter.
// But we must preserve backward compatibility too (for mixed version cluster that is).
- int toGroup = cfDef.isCompact ? -1 : cfDef.columns.size();
+ int toGroup = cfDef.isCompact ? -1 : cfDef.clusteringColumnsCount();
List<ByteBuffer> startBounds = getRequestedBound(Bound.START, variables);
List<ByteBuffer> endBounds = getRequestedBound(Bound.END, variables);
assert startBounds.size() == endBounds.size();
+ // Handles fetching static columns. Note that for 2i, the filter is just used to restrict
+ // the part of the index to query so adding the static slice would be useless and confusing.
+ // For 2i, static columns are retrieve in CompositesSearcher with each index hit.
+ ColumnSlice staticSlice = null;
+ if (selectsStaticColumns && !usesSecondaryIndexing)
+ {
+ ColumnNameBuilder staticPrefix = cfDef.cfm.getStaticColumnNameBuilder();
+ // Note: we could use staticPrefix.build() for the start bound, but EMPTY_BYTE_BUFFER gives us the
+ // same effect while saving a few CPU cycles.
+ staticSlice = isReversed
+ ? new ColumnSlice(staticPrefix.buildAsEndOfRange(), ByteBufferUtil.EMPTY_BYTE_BUFFER)
+ : new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, staticPrefix.buildAsEndOfRange());
+
+ // In the case where we only select static columns, we want to really only check the static columns.
+ // So we return early as the rest of that method would actually make us query everything
+ if (selectsOnlyStaticColumns)
+ return sliceFilter(staticSlice, limit, toGroup);
+ }
+
// The case where startBounds == 1 is common enough that it's worth optimizing
- ColumnSlice[] slices;
if (startBounds.size() == 1)
{
ColumnSlice slice = new ColumnSlice(startBounds.get(0), endBounds.get(0));
if (slice.isAlwaysEmpty(cfDef.cfm.comparator, isReversed))
- return null;
- slices = new ColumnSlice[]{slice};
+ return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup);
+
+ return staticSlice == null
+ ? sliceFilter(slice, limit, toGroup)
+ : (slice.includes(cfDef.cfm.comparator, staticSlice.finish) ? sliceFilter(new ColumnSlice(staticSlice.start, slice.finish), limit, toGroup)
+ : sliceFilter(new ColumnSlice[]{ staticSlice, slice }, limit, toGroup));
+ }
+
+ List<ColumnSlice> l = new ArrayList<ColumnSlice>(startBounds.size());
+ for (int i = 0; i < startBounds.size(); i++)
+ {
+ ColumnSlice slice = new ColumnSlice(startBounds.get(i), endBounds.get(i));
+ if (!slice.isAlwaysEmpty(cfDef.cfm.comparator, isReversed))
+ l.add(slice);
+ }
+
+ if (l.isEmpty())
+ return staticSlice == null ? null : sliceFilter(staticSlice, limit, toGroup);
+ if (staticSlice == null)
+ return sliceFilter(l.toArray(new ColumnSlice[l.size()]), limit, toGroup);
+
+ // The slices should not overlap. We know the slices built from startBounds/endBounds don't, but
+ // if there is a static slice, it could overlap with the 2nd slice. Check for it and correct if
+ // that's the case
+ ColumnSlice[] slices;
+ if (isReversed)
+ {
+ if (l.get(l.size() - 1).includes(cfDef.cfm.comparator, staticSlice.start))
+ {
+ slices = l.toArray(new ColumnSlice[l.size()]);
+ slices[slices.length-1] = new ColumnSlice(slices[slices.length-1].start, ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ }
+ else
+ {
+ slices = l.toArray(new ColumnSlice[l.size()+1]);
+ slices[slices.length-1] = staticSlice;
+ }
}
else
{
- List<ColumnSlice> l = new ArrayList<ColumnSlice>(startBounds.size());
- for (int i = 0; i < startBounds.size(); i++)
+ if (l.get(0).includes(cfDef.cfm.comparator, staticSlice.finish))
{
- ColumnSlice slice = new ColumnSlice(startBounds.get(i), endBounds.get(i));
- if (!slice.isAlwaysEmpty(cfDef.cfm.comparator, isReversed))
- l.add(slice);
+ slices = new ColumnSlice[l.size()];
+ slices[0] = new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, l.get(0).finish);
+ for (int i = 1; i < l.size(); i++)
+ slices[i] = l.get(i);
+ }
+ else
+ {
+ slices = new ColumnSlice[l.size()+1];
+ slices[0] = staticSlice;
+ for (int i = 0; i < l.size(); i++)
+ slices[i] = l.get(i);
}
- if (l.isEmpty())
- return null;
- slices = l.toArray(new ColumnSlice[l.size()]);
}
-
- return new SliceQueryFilter(slices, isReversed, limit, toGroup);
+ return sliceFilter(slices, limit, toGroup);
}
else
{
@@ -423,6 +518,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
}
+ private SliceQueryFilter sliceFilter(ColumnSlice slice, int limit, int toGroup)
+ {
+ return sliceFilter(new ColumnSlice[]{ slice }, limit, toGroup);
+ }
+
+ private SliceQueryFilter sliceFilter(ColumnSlice[] slices, int limit, int toGroup)
+ {
+ return new SliceQueryFilter(slices, isReversed, limit, toGroup);
+ }
+
private int getLimit(List<ByteBuffer> variables) throws InvalidRequestException
{
int l = Integer.MAX_VALUE;
@@ -458,7 +563,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
{
List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
ColumnNameBuilder builder = cfDef.getKeyNameBuilder();
- for (CFDefinition.Name name : cfDef.keys.values())
+ for (CFDefinition.Name name : cfDef.partitionKeys())
{
Restriction r = keyRestrictions[name.position];
assert r != null && !r.isSlice();
@@ -497,7 +602,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return ByteBufferUtil.EMPTY_BYTE_BUFFER;
// We deal with IN queries for keys in other places, so we know buildBound will return only one result
- return buildBound(b, cfDef.keys.values(), keyRestrictions, false, cfDef.getKeyNameBuilder(), variables).get(0);
+ return buildBound(b, cfDef.partitionKeys(), keyRestrictions, false, cfDef.getKeyNameBuilder(), variables).get(0);
}
private Token getTokenBound(Bound b, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException
@@ -556,13 +661,15 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
private SortedSet<ByteBuffer> getRequestedColumns(List<ByteBuffer> variables) throws InvalidRequestException
{
+ // Note: getRequestedColumns don't handle static columns, but due to CASSANDRA-5762
+ // we always do a slice for CQL3 tables, so it's ok to ignore them here
assert !isColumnRange();
ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
- Iterator<ColumnIdentifier> idIter = cfDef.columns.keySet().iterator();
+ Iterator<CFDefinition.Name> idIter = cfDef.clusteringColumns().iterator();
for (Restriction r : columnRestrictions)
{
- ColumnIdentifier id = idIter.next();
+ ColumnIdentifier id = idIter.next().name;
assert r != null && !r.isSlice();
List<ByteBuffer> values = r.values(variables);
@@ -625,15 +732,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
columns.add(builder.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build());
// selected columns
- for (ColumnIdentifier id : selection.regularColumnsToFetch())
+ for (ColumnIdentifier id : selection.regularAndStaticColumnsToFetch())
columns.add(builder.copy().add(id.key).build());
}
else
{
- Iterator<ColumnIdentifier> iter = cfDef.metadata.keySet().iterator();
+ // We now that we're not composite so we can ignore static columns
+ Iterator<CFDefinition.Name> iter = cfDef.regularColumns().iterator();
while (iter.hasNext())
{
- ColumnIdentifier name = iter.next();
+ ColumnIdentifier name = iter.next().name;
ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
ByteBuffer cname = b.add(name.key).build();
columns.add(cname);
@@ -760,7 +868,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
private List<ByteBuffer> getRequestedBound(Bound b, List<ByteBuffer> variables) throws InvalidRequestException
{
assert isColumnRange();
- return buildBound(b, cfDef.columns.values(), columnRestrictions, isReversed, cfDef.getColumnNameBuilder(), variables);
+ return buildBound(b, cfDef.clusteringColumns(), columnRestrictions, isReversed, cfDef.getColumnNameBuilder(), variables);
}
public List<IndexExpression> getIndexExpressions(List<ByteBuffer> variables) throws InvalidRequestException
@@ -781,6 +889,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
restriction = columnRestrictions[name.position];
break;
case COLUMN_METADATA:
+ case STATIC:
restriction = metadataRestrictions.get(name);
break;
default:
@@ -956,6 +1065,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
result.add(c);
break;
case COLUMN_METADATA:
+ case STATIC:
// This should not happen for compact CF
throw new AssertionError();
default:
@@ -979,8 +1089,35 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
builder.add(c);
}
+ Map<CFDefinition.Name, ByteBuffer> staticValues = Collections.emptyMap();
+ // Gather up static values first
+ if (!builder.isEmpty() && builder.firstGroup().isStatic)
+ {
+ staticValues = new HashMap<>();
+ ColumnGroupMap group = builder.firstGroup();
+ for (CFDefinition.Name name : Iterables.filter(selection.getColumnsList(), isStaticFilter))
+ staticValues.put(name, name.type.isCollection() ? getCollectionValue(name, group) : getSimpleValue(name, group));
+ builder.discardFirst();
+
+ // If there was static columns but there is no actual row, then provided the select was a full
+ // partition selection (i.e. not a 2ndary index search and there was no condition on clustering columns)
+ // then we want to include the static columns in the result set.
+ if (!staticValues.isEmpty() && builder.isEmpty() && !usesSecondaryIndexing && hasNoClusteringColumnsRestriction())
+ {
+ result.newRow();
+ for (CFDefinition.Name name : selection.getColumnsList())
+ {
+ if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS)
+ result.add(keyComponents[name.position]);
+ else
+ result.add(name.kind == CFDefinition.Name.Kind.STATIC ? staticValues.get(name) : null);
+ }
+ return;
+ }
+ }
+
for (ColumnGroupMap group : builder.groups())
- handleGroup(selection, result, keyComponents, group);
+ handleGroup(selection, result, keyComponents, group, staticValues);
}
else
{
@@ -999,6 +1136,14 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
}
+ private boolean hasNoClusteringColumnsRestriction()
+ {
+ for (int i = 0; i < columnRestrictions.length; i++)
+ if (columnRestrictions[i] != null)
+ return false;
+ return true;
+ }
+
/**
* Orders results when multiple keys are selected (using IN)
*/
@@ -1039,7 +1184,11 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
Collections.sort(cqlRows.rows, new CompositeComparator(types, positions));
}
- private void handleGroup(Selection selection, Selection.ResultSetBuilder result, ByteBuffer[] keyComponents, ColumnGroupMap columns) throws InvalidRequestException
+ private void handleGroup(Selection selection,
+ Selection.ResultSetBuilder result,
+ ByteBuffer[] keyComponents,
+ ColumnGroupMap columns,
+ Map<CFDefinition.Name, ByteBuffer> staticValues) throws InvalidRequestException
{
// Respect requested order
result.newRow();
@@ -1059,21 +1208,32 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
case COLUMN_METADATA:
if (name.type.isCollection())
{
- List<Pair<ByteBuffer, Column>> collection = columns.getCollection(name.name.key);
- ByteBuffer value = collection == null
- ? null
- : ((CollectionType)name.type).serialize(collection);
- result.add(value);
+ result.add(getCollectionValue(name, columns));
}
else
{
result.add(columns.getSimple(name.name.key));
}
break;
+ case STATIC:
+ result.add(staticValues.get(name));
+ break;
}
}
}
+ private static ByteBuffer getCollectionValue(CFDefinition.Name name, ColumnGroupMap columns)
+ {
+ List<Pair<ByteBuffer, Column>> collection = columns.getCollection(name.name.key);
+ return collection == null ? null : ((CollectionType)name.type).serialize(collection);
+ }
+
+ private static ByteBuffer getSimpleValue(CFDefinition.Name name, ColumnGroupMap columns)
+ {
+ Column c = columns.getSimple(name.name.key);
+ return c == null ? null : c.value();
+ }
+
private static boolean isReversedType(CFDefinition.Name name)
{
return name.type instanceof ReversedType;
@@ -1089,6 +1249,14 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
return true;
}
+ private boolean hasClusteringColumnsRestriction()
+ {
+ for (int i = 0; i < columnRestrictions.length; i++)
+ if (columnRestrictions[i] != null)
+ return true;
+ return false;
+ }
+
public static class RawStatement extends CFStatement
{
private final Parameters parameters;
@@ -1122,7 +1290,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
: Selection.fromSelectors(cfDef, selectClause);
if (parameters.isDistinct)
- validateDistinctSelection(selection.getColumnsList(), cfDef.keys.values());
+ validateDistinctSelection(selection.getColumnsList(), cfDef.partitionKeys());
Term prepLimit = null;
if (limit != null)
@@ -1174,6 +1342,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
case VALUE_ALIAS:
throw new InvalidRequestException(String.format("Predicates on the non-primary-key column (%s) of a COMPACT table are not yet supported", name.name));
case COLUMN_METADATA:
+ case STATIC:
// We only all IN on the row key and last clustering key so far, never on non-PK columns, and this even if there's an index
Restriction r = updateRestriction(cfm, name, stmt.metadataRestrictions.get(name), rel, names);
if (r.isIN() && !((Restriction.IN)r).canHaveOnlyOneValue())
@@ -1197,7 +1366,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
boolean canRestrictFurtherComponents = true;
CFDefinition.Name previous = null;
stmt.keyIsInRelation = false;
- Iterator<CFDefinition.Name> iter = cfDef.keys.values().iterator();
+ Iterator<CFDefinition.Name> iter = cfDef.partitionKeys().iterator();
for (int i = 0; i < stmt.keyRestrictions.length; i++)
{
CFDefinition.Name cname = iter.next();
@@ -1266,7 +1435,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
// All (or none) of the partition key columns have been specified;
// hence there is no need to turn these restrictions into index expressions.
if (!stmt.usesSecondaryIndexing)
- stmt.restrictedNames.removeAll(cfDef.keys.values());
+ stmt.restrictedNames.removeAll(cfDef.partitionKeys());
+
+ if (stmt.selectsOnlyStaticColumns && stmt.hasClusteringColumnsRestriction())
+ throw new InvalidRequestException("Cannot restrict clustering columns when selecting only static columns");
// If a clustering key column is restricted by a non-EQ relation, all preceding
// columns must have a EQ, and all following must have no restriction. Unless
@@ -1274,7 +1446,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
canRestrictFurtherComponents = true;
previous = null;
boolean previousIsSlice = false;
- iter = cfDef.columns.values().iterator();
+ iter = cfDef.clusteringColumns().iterator();
for (int i = 0; i < stmt.columnRestrictions.length; i++)
{
CFDefinition.Name cname = iter.next();
@@ -1332,7 +1504,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
stmt.usesSecondaryIndexing = true;
if (!stmt.usesSecondaryIndexing)
- stmt.restrictedNames.removeAll(cfDef.columns.values());
+ stmt.restrictedNames.removeAll(cfDef.clusteringColumns());
// Even if usesSecondaryIndexing is false at this point, we'll still have to use one if
// there is restrictions not covered by the PK.
@@ -1343,8 +1515,16 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
stmt.usesSecondaryIndexing = true;
}
- if (stmt.usesSecondaryIndexing && stmt.keyIsInRelation)
- throw new InvalidRequestException("Select on indexed columns and with IN clause for the PRIMARY KEY are not supported");
+ if (stmt.usesSecondaryIndexing)
+ {
+ if (stmt.keyIsInRelation)
+ throw new InvalidRequestException("Select on indexed columns and with IN clause for the PRIMARY KEY are not supported");
+ // When the user only select static columns, the intent is that we don't query the whole partition but just
+ // the static parts. But 1) we don't have an easy way to do that with 2i and 2) since we don't support index on static columns
+ // so far, 2i means that you've restricted a non static column, so the query is somewhat non-sensical.
+ if (stmt.selectsOnlyStaticColumns)
+ throw new InvalidRequestException("Queries using 2ndary indexes don't support selecting only static columns");
+ }
if (!stmt.parameters.orderings.isEmpty())
{
@@ -1401,7 +1581,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
}
- Boolean[] reversedMap = new Boolean[cfDef.columns.size()];
+ Boolean[] reversedMap = new Boolean[cfDef.clusteringColumnsCount()];
int i = 0;
for (Map.Entry<ColumnIdentifier, Boolean> entry : stmt.parameters.orderings.entrySet())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Selection.java b/src/java/org/apache/cassandra/cql3/statements/Selection.java
index 600ee1b..9760311 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Selection.java
@@ -49,6 +49,12 @@ public abstract class Selection
this.collectTTLs = collectTTLs;
}
+ // Overriden by SimpleSelection when appropriate.
+ public boolean isWildcard()
+ {
+ return false;
+ }
+
public ResultSet.Metadata getResultMetadata()
{
return new ResultSet.Metadata(metadata);
@@ -59,12 +65,12 @@ public abstract class Selection
List<CFDefinition.Name> all = new ArrayList<CFDefinition.Name>();
for (CFDefinition.Name name : cfDef)
all.add(name);
- return new SimpleSelection(all);
+ return new SimpleSelection(all, true);
}
public static Selection forColumns(List<CFDefinition.Name> columnsList)
{
- return new SimpleSelection(columnsList);
+ return new SimpleSelection(columnsList, false);
}
private static boolean isUsingFunction(List<RawSelector> rawSelectors)
@@ -105,7 +111,7 @@ public abstract class Selection
CFDefinition.Name name = cfDef.get(tot.id);
if (name == null)
throw new InvalidRequestException(String.format("Undefined name %s in selection clause", tot.id));
- if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
+ if (name.isPrimaryKeyColumn())
throw new InvalidRequestException(String.format("Cannot use selection function %s on PRIMARY KEY part %s", tot.isWritetime ? "writeTime" : "ttl", name));
if (name.type.isCollection())
throw new InvalidRequestException(String.format("Cannot use selection function %s on collections", tot.isWritetime ? "writeTime" : "ttl"));
@@ -195,7 +201,7 @@ public abstract class Selection
names.add(name);
metadata.add(rawSelector.alias == null ? name : makeAliasSpec(cfDef, name.type, rawSelector.alias));
}
- return new SimpleSelection(names, metadata);
+ return new SimpleSelection(names, metadata, false);
}
}
@@ -204,12 +210,12 @@ public abstract class Selection
/**
* @return the list of CQL3 "regular" (the "COLUMN_METADATA" ones) column names to fetch.
*/
- public List<ColumnIdentifier> regularColumnsToFetch()
+ public List<ColumnIdentifier> regularAndStaticColumnsToFetch()
{
List<ColumnIdentifier> toFetch = new ArrayList<ColumnIdentifier>();
for (CFDefinition.Name name : columnsList)
{
- if (name.kind == CFDefinition.Name.Kind.COLUMN_METADATA)
+ if (name.kind == CFDefinition.Name.Kind.COLUMN_METADATA || name.kind == CFDefinition.Name.Kind.STATIC)
toFetch.add(name.name);
}
return toFetch;
@@ -307,12 +313,14 @@ public abstract class Selection
// Special cased selection for when no function is used (this save some allocations).
private static class SimpleSelection extends Selection
{
- public SimpleSelection(List<CFDefinition.Name> columnsList)
+ private final boolean isWildcard;
+
+ public SimpleSelection(List<CFDefinition.Name> columnsList, boolean isWildcard)
{
- this(columnsList, new ArrayList<ColumnSpecification>(columnsList));
+ this(columnsList, new ArrayList<ColumnSpecification>(columnsList), isWildcard);
}
- public SimpleSelection(List<CFDefinition.Name> columnsList, List<ColumnSpecification> metadata)
+ public SimpleSelection(List<CFDefinition.Name> columnsList, List<ColumnSpecification> metadata, boolean isWildcard)
{
/*
* In theory, even a simple selection could have multiple time the same column, so we
@@ -320,12 +328,19 @@ public abstract class Selection
* get much duplicate in practice, it's more efficient not to bother.
*/
super(columnsList, metadata, false, false);
+ this.isWildcard = isWildcard;
}
protected List<ByteBuffer> handleRow(ResultSetBuilder rs)
{
return rs.current;
}
+
+ @Override
+ public boolean isWildcard()
+ {
+ return isWildcard;
+ }
}
private interface Selector extends AssignementTestable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index a387962..0e6481b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -35,9 +35,9 @@ public class UpdateStatement extends ModificationStatement
{
private static final Operation setToEmptyOperation = new Constants.Setter(null, new Constants.Value(ByteBufferUtil.EMPTY_BYTE_BUFFER));
- private UpdateStatement(CFMetaData cfm, Attributes attrs)
+ private UpdateStatement(StatementType type, CFMetaData cfm, Attributes attrs)
{
- super(cfm, attrs);
+ super(type, cfm, attrs);
}
public boolean requireFullClusteringKey()
@@ -72,9 +72,9 @@ public class UpdateStatement extends ModificationStatement
if (cfDef.isCompact)
{
if (builder.componentCount() == 0)
- throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s", cfDef.columns.values().iterator().next()));
+ throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s", cfDef.clusteringColumns().iterator().next()));
- if (cfDef.value == null)
+ if (cfDef.compactValue() == null)
{
// compact + no compact value implies there is no column outside the PK. So no operation could
// have passed through validation
@@ -85,7 +85,7 @@ public class UpdateStatement extends ModificationStatement
{
// compact means we don't have a row marker, so don't accept to set only the PK. See CASSANDRA-5648.
if (updates.isEmpty())
- throw new InvalidRequestException(String.format("Column %s is mandatory for this COMPACT STORAGE table", cfDef.value));
+ throw new InvalidRequestException(String.format("Column %s is mandatory for this COMPACT STORAGE table", cfDef.compactValue()));
for (Operation update : updates)
update.execute(key, cf, builder.copy(), params);
@@ -131,7 +131,7 @@ public class UpdateStatement extends ModificationStatement
protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
{
- UpdateStatement stmt = new UpdateStatement(cfDef.cfm, attrs);
+ UpdateStatement stmt = new UpdateStatement(ModificationStatement.StatementType.INSERT, cfDef.cfm, attrs);
// Created from an INSERT
if (stmt.isCounter())
@@ -159,10 +159,11 @@ public class UpdateStatement extends ModificationStatement
case COLUMN_ALIAS:
Term t = value.prepare(name);
t.collectMarkerSpecification(boundNames);
- stmt.addKeyValue(name.name, t);
+ stmt.addKeyValue(name, t);
break;
case VALUE_ALIAS:
case COLUMN_METADATA:
+ case STATIC:
Operation operation = new Operation.SetValue(value).prepare(name);
operation.collectMarkerSpecification(boundNames);
stmt.addOperation(operation);
@@ -192,7 +193,7 @@ public class UpdateStatement extends ModificationStatement
Attributes.Raw attrs,
List<Pair<ColumnIdentifier, Operation.RawUpdate>> updates,
List<Relation> whereClause,
- List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions)
+ List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions)
{
super(name, attrs, conditions, false);
this.updates = updates;
@@ -201,7 +202,7 @@ public class UpdateStatement extends ModificationStatement
protected ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException
{
- UpdateStatement stmt = new UpdateStatement(cfDef.cfm, attrs);
+ UpdateStatement stmt = new UpdateStatement(ModificationStatement.StatementType.UPDATE, cfDef.cfm, attrs);
for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : updates)
{
@@ -219,6 +220,7 @@ public class UpdateStatement extends ModificationStatement
throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
case VALUE_ALIAS:
case COLUMN_METADATA:
+ case STATIC:
stmt.addOperation(operation);
break;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index e0c576e..9eff12a 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -63,7 +63,6 @@ public class ColumnSlice
return !finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) && cmp.compare(finish, name) < 0;
}
-
@Override
public final int hashCode()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 90e7089..eb618f4 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -26,10 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.marshal.CompositeType;
@@ -238,11 +235,28 @@ public class CompositesSearcher extends SecondaryIndexSearcher
// We always query the whole CQL3 row. In the case where the original filter was a name filter this might be
// slightly wasteful, but this probably doesn't matter in practice and it simplify things.
- SliceQueryFilter dataFilter = new SliceQueryFilter(start,
- entry.indexedEntryEnd(),
- false,
- Integer.MAX_VALUE,
- baseCfs.metadata.clusteringKeyColumns().size());
+ ColumnSlice dataSlice = new ColumnSlice(start, entry.indexedEntryEnd());
+ ColumnSlice[] slices;
+ if (baseCfs.metadata.hasStaticColumns())
+ {
+ // If the table has static columns, we must fetch them too as they may need to be returned too.
+ // Note that this is potentially wasteful for 2 reasons:
+ // 1) we will retrieve the static parts for each indexed row, even if we have more than one row in
+ // the same partition. If we were to group data queries to rows on the same slice, which would
+ // speed up things in general, we would also optimize here since we would fetch static columns only
+ // once for each group.
+ // 2) at this point we don't know if the user asked for static columns or not, so we might be fetching
+ // them for nothing. We would however need to ship the list of "CQL3 columns selected" with getRangeSlice
+ // to be able to know that.
+ // TODO: we should improve both point above
+ ColumnSlice staticSlice = new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, baseCfs.metadata.getStaticColumnNameBuilder().buildAsEndOfRange());
+ slices = new ColumnSlice[]{ staticSlice, dataSlice };
+ }
+ else
+ {
+ slices = new ColumnSlice[]{ dataSlice };
+ }
+ SliceQueryFilter dataFilter = new SliceQueryFilter(slices, false, Integer.MAX_VALUE, baseCfs.metadata.clusteringKeyColumns().size());
ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter, filter.timestamp));
if (newData == null || index.isStale(entry, newData, filter.timestamp))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index d002aa7..9250b0f 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -34,6 +34,7 @@ import java.util.List;
*/
public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
{
+
// changes bb position
protected static int getShortLength(ByteBuffer bb)
{
@@ -41,6 +42,13 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
return length | (bb.get() & 0xFF);
}
+ // Doesn't change bb position
+ protected static int getShortLength(ByteBuffer bb, int position)
+ {
+ int length = (bb.get(position) & 0xFF) << 8;
+ return length | (bb.get(position + 1) & 0xFF);
+ }
+
// changes bb position
protected static void putShortLength(ByteBuffer bb, int length)
{
@@ -66,11 +74,17 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
public int compare(ByteBuffer o1, ByteBuffer o2)
{
- if (o1 == null)
- return o2 == null ? 0 : -1;
+ if (o1 == null || !o1.hasRemaining())
+ return o2 == null || !o2.hasRemaining() ? 0 : -1;
ByteBuffer bb1 = o1.duplicate();
ByteBuffer bb2 = o2.duplicate();
+
+ boolean isStatic1 = readIsStatic(bb1);
+ boolean isStatic2 = readIsStatic(bb2);
+ if (isStatic1 != isStatic2)
+ return isStatic1 ? -1 : 1;
+
int i = 0;
ByteBuffer previous = null;
@@ -90,22 +104,9 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
byte b1 = bb1.get();
byte b2 = bb2.get();
- if (b1 < 0)
- {
- if (b2 >= 0)
- return -1;
- }
- else if (b1 > 0)
- {
- if (b2 <= 0)
- return 1;
- }
- else
- {
- // b1 == 0
- if (b2 != 0)
- return -b2;
- }
+ if (b1 != b2)
+ return b1 - b2;
+
++i;
}
@@ -116,6 +117,10 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
return 1;
}
+ // Check if the provided BB represents a static name and advance the
+ // buffer to the real beginning if so.
+ protected abstract boolean readIsStatic(ByteBuffer bb);
+
/**
* Split a composite column names into it's components.
*/
@@ -123,6 +128,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
{
List<ByteBuffer> l = new ArrayList<ByteBuffer>();
ByteBuffer bb = name.duplicate();
+ readIsStatic(bb);
int i = 0;
while (bb.remaining() > 0)
{
@@ -150,6 +156,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
List<CompositeComponent> list = new ArrayList<CompositeComponent>();
ByteBuffer bb = bytes.duplicate();
+ readIsStatic(bb);
int i = 0;
while (bb.remaining() > 0)
@@ -219,8 +226,9 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
{
StringBuilder sb = new StringBuilder();
ByteBuffer bb = bytes.duplicate();
- int i = 0;
+ readIsStatic(bb);
+ int i = 0;
while (bb.remaining() > 0)
{
if (bb.remaining() != bytes.remaining())
@@ -290,6 +298,7 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
public void validate(ByteBuffer bytes) throws MarshalException
{
ByteBuffer bb = bytes.duplicate();
+ readIsStatic(bb);
int i = 0;
ByteBuffer previous = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 2c0e121..83e3b97 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -42,8 +42,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
* <component><component><component> ...
* where <component> is:
* <length of value><value><'end-of-component' byte>
- * where <length of value> is a 2 bytes unsigned short the and the
- * 'end-of-component' byte should always be 0 for actual column name.
+ * where <length of value> is a 2 bytes unsigned short (but 0xFFFF is invalid, see
+ * below) and the 'end-of-component' byte should always be 0 for actual column name.
* However, it can set to 1 for query bounds. This allows to query for the
* equivalent of 'give me the full super-column'. That is, if during a slice
* query uses:
@@ -56,9 +56,16 @@ import org.apache.cassandra.utils.ByteBufferUtil;
* start = <3><"foo".getBytes()><-1>
* allows to query everything that is greater than <3><"foo".getBytes()>, but
* not <3><"foo".getBytes()> itself.
+ *
+ * On top of that, CQL3 uses a specific prefix (0xFFFF) to encode "static columns"
+ * (CASSANDRA-6561). This does mean the maximum size of the first component of a
+ * composite is 65534, not 65535 (or we wouldn't be able to detect if the first 2
+ * bytes is the static prefix or not).
*/
public class CompositeType extends AbstractCompositeType
{
+ public static final int STATIC_MARKER = 0xFFFF;
+
public final List<AbstractType<?>> types;
// interning instances
@@ -74,6 +81,24 @@ public class CompositeType extends AbstractCompositeType
return getInstance(Arrays.<AbstractType<?>>asList(types));
}
+ protected boolean readIsStatic(ByteBuffer bb)
+ {
+ return readStatic(bb);
+ }
+
+ private static boolean readStatic(ByteBuffer bb)
+ {
+ if (bb.remaining() < 2)
+ return false;
+
+ int header = getShortLength(bb, bb.position());
+ if ((header & 0xFFFF) != STATIC_MARKER)
+ return false;
+
+ getShortLength(bb); // Skip header
+ return true;
+ }
+
public static synchronized CompositeType getInstance(List<AbstractType<?>> types)
{
assert types != null && !types.isEmpty();
@@ -149,6 +174,7 @@ public class CompositeType extends AbstractCompositeType
public static ByteBuffer extractComponent(ByteBuffer bb, int idx)
{
bb = bb.duplicate();
+ readStatic(bb);
int i = 0;
while (bb.remaining() > 0)
{
@@ -169,6 +195,11 @@ public class CompositeType extends AbstractCompositeType
return extractComponent(bb, idx);
}
+ public static boolean isStaticName(ByteBuffer bb)
+ {
+ return bb.remaining() >= 2 && (getShortLength(bb, bb.position()) & 0xFFFF) == STATIC_MARKER;
+ }
+
@Override
public int componentsCount()
{
@@ -317,24 +348,33 @@ public class CompositeType extends AbstractCompositeType
private final List<ByteBuffer> components;
private final byte[] endOfComponents;
private int serializedSize;
+ private final boolean isStatic;
public Builder(CompositeType composite)
{
- this(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()]);
+ this(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], false);
+ }
+
+ public static Builder staticBuilder(CompositeType composite)
+ {
+ return new Builder(composite, new ArrayList<ByteBuffer>(composite.types.size()), new byte[composite.types.size()], true);
}
- public Builder(CompositeType composite, List<ByteBuffer> components, byte[] endOfComponents)
+ private Builder(CompositeType composite, List<ByteBuffer> components, byte[] endOfComponents, boolean isStatic)
{
assert endOfComponents.length == composite.types.size();
this.composite = composite;
this.components = components;
this.endOfComponents = endOfComponents;
+ this.isStatic = isStatic;
+ if (isStatic)
+ serializedSize = 2;
}
private Builder(Builder b)
{
- this(b.composite, new ArrayList<ByteBuffer>(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length));
+ this(b.composite, new ArrayList<ByteBuffer>(b.components), Arrays.copyOf(b.endOfComponents, b.endOfComponents.length), b.isStatic);
this.serializedSize = b.serializedSize;
}
@@ -344,6 +384,7 @@ public class CompositeType extends AbstractCompositeType
throw new IllegalStateException("Composite column is already fully constructed");
components.add(bb);
+ serializedSize += 3 + bb.remaining(); // 2 bytes lenght + 1 byte eoc
return this;
}
@@ -364,20 +405,23 @@ public class CompositeType extends AbstractCompositeType
public ByteBuffer build()
{
- DataOutputBuffer out = new DataOutputBuffer(serializedSize);
- for (int i = 0; i < components.size(); i++)
+ try
{
- try
+ DataOutputBuffer out = new DataOutputBuffer(serializedSize);
+ if (isStatic)
+ out.writeShort(STATIC_MARKER);
+
+ for (int i = 0; i < components.size(); i++)
{
ByteBufferUtil.writeWithShortLength(components.get(i), out);
+ out.write(endOfComponents[i]);
}
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- out.write(endOfComponents[i]);
+ return ByteBuffer.wrap(out.getData(), 0, out.getLength());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
}
- return ByteBuffer.wrap(out.getData(), 0, out.getLength());
}
public ByteBuffer buildAsEndOfRange()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java b/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
index 35e6e33..9b56a82 100644
--- a/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/DynamicCompositeType.java
@@ -75,6 +75,12 @@ public class DynamicCompositeType extends AbstractCompositeType
this.aliases = aliases;
}
+ protected boolean readIsStatic(ByteBuffer bb)
+ {
+ // We don't have the static nothing for DCT
+ return false;
+ }
+
private AbstractType<?> getComparator(ByteBuffer bb)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 3fb1c5a..65e3be1 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.*;
+import com.google.common.collect.Iterables;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
@@ -674,11 +675,11 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
// otherwise for CqlStorage, check metadata for classic thrift tables
CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
- for (ColumnIdentifier column : cfDefinition.metadata.keySet())
+ for (CFDefinition.Name column : Iterables.concat(cfDefinition.staticColumns(), cfDefinition.regularColumns()))
{
ColumnDef cDef = new ColumnDef();
- String columnName = column.toString();
- String type = cfDefinition.metadata.get(column).type.toString();
+ String columnName = column.name.toString();
+ String type = column.type.toString();
logger.debug("name: {}, type: {} ", columnName, type);
cDef.name = ByteBufferUtil.bytes(columnName);
cDef.validation_class = type;
@@ -688,12 +689,12 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
// could have already processed it as schema_columnfamilies.value_alias
if (columnDefs.size() == 0 && includeCompactValueColumn)
{
- String value = cfDefinition.value != null ? cfDefinition.value.toString() : null;
+ String value = cfDefinition.compactValue() != null ? cfDefinition.compactValue().toString() : null;
if ("value".equals(value))
{
ColumnDef cDef = new ColumnDef();
cDef.name = ByteBufferUtil.bytes(value);
- cDef.validation_class = cfDefinition.value.type.toString();
+ cDef.validation_class = cfDefinition.compactValue().type.toString();
columnDefs.add(cDef);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index c6db82f..a5156b9 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -524,17 +524,17 @@ public class CqlStorage extends AbstractCassandraStorage
if (keys.size() == 0)
{
CFDefinition cfDefinition = getCfDefinition(keyspace, column_family, client);
- for (ColumnIdentifier column : cfDefinition.keys.keySet())
+ for (CFDefinition.Name column : cfDefinition.partitionKeys())
{
- String key = column.toString();
+ String key = column.name.toString();
logger.debug("name: {} ", key);
ColumnDef cDef = new ColumnDef();
cDef.name = ByteBufferUtil.bytes(key);
keys.add(cDef);
}
- for (ColumnIdentifier column : cfDefinition.columns.keySet())
+ for (CFDefinition.Name column : cfDefinition.clusteringColumns())
{
- String key = column.toString();
+ String key = column.name.toString();
logger.debug("name: {} ", key);
ColumnDef cDef = new ColumnDef();
cDef.name = ByteBufferUtil.bytes(key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/service/CASConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASConditions.java b/src/java/org/apache/cassandra/service/CASConditions.java
index d4b3e19..c0a2111 100644
--- a/src/java/org/apache/cassandra/service/CASConditions.java
+++ b/src/java/org/apache/cassandra/service/CASConditions.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.exceptions.InvalidRequestException;
/**
* Abstract the conditions to be fulfilled by a CAS operation.
@@ -34,5 +35,5 @@ public interface CASConditions
* Returns whether the provided CF, that represents the values fetched using the
* readFilter(), match the CAS conditions this object stands for.
*/
- public boolean appliesTo(ColumnFamily current);
+ public boolean appliesTo(ColumnFamily current) throws InvalidRequestException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index f2efc03..2b435ff 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -245,7 +245,8 @@ public class ThriftValidation
continue; // Row marker, ok
ColumnIdentifier columnId = new ColumnIdentifier(CQL3ColumnName, composite.types.get(columnIndex));
- if (cfDef.metadata.get(columnId) == null)
+ CFDefinition.Name columnName = cfDef.get(columnId);
+ if (columnName == null || columnName.isPrimaryKeyColumn())
throw new org.apache.cassandra.exceptions.InvalidRequestException(String.format("Invalid cell for CQL3 table %s. The CQL3 column component (%s) does not correspond to a defined CQL3 column",
metadata.cfName, columnId));
[2/2] git commit: Add static columns in CQL3
Posted by sl...@apache.org.
Add static columns in CQL3
patch by slebresne; reviewed by iamaleksey for CASSANDRA-6561
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b09d8769
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b09d8769
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b09d8769
Branch: refs/heads/cassandra-2.0
Commit: b09d876914ad9c9fdf1af35cf48cdb98c27bbf32
Parents: b2b3055
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Jan 9 18:44:21 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Feb 20 10:15:02 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
doc/cql3/CQL.textile | 29 +-
.../org/apache/cassandra/config/CFMetaData.java | 35 +-
.../cassandra/config/ColumnDefinition.java | 9 +-
.../org/apache/cassandra/cql3/CFDefinition.java | 123 +++++--
.../apache/cassandra/cql3/ColumnCondition.java | 191 +++++++++++
.../org/apache/cassandra/cql3/Constants.java | 5 +-
src/java/org/apache/cassandra/cql3/Cql.g | 29 +-
src/java/org/apache/cassandra/cql3/Lists.java | 7 +-
src/java/org/apache/cassandra/cql3/Maps.java | 8 +-
.../org/apache/cassandra/cql3/Operation.java | 19 ++
src/java/org/apache/cassandra/cql3/Sets.java | 6 +-
.../cassandra/cql3/functions/TokenFct.java | 4 +-
.../cql3/statements/AlterTableStatement.java | 32 +-
.../cql3/statements/BatchStatement.java | 158 +++++++--
.../cql3/statements/CQL3CasConditions.java | 164 +++++++++
.../cql3/statements/ColumnGroupMap.java | 29 +-
.../cql3/statements/CreateIndexStatement.java | 9 +
.../cql3/statements/CreateTableStatement.java | 39 ++-
.../cql3/statements/DeleteStatement.java | 33 +-
.../cql3/statements/ModificationStatement.java | 331 +++++++++++--------
.../cql3/statements/SelectStatement.java | 258 ++++++++++++---
.../cassandra/cql3/statements/Selection.java | 33 +-
.../cql3/statements/UpdateStatement.java | 20 +-
.../apache/cassandra/db/filter/ColumnSlice.java | 1 -
.../db/index/composites/CompositesSearcher.java | 32 +-
.../db/marshal/AbstractCompositeType.java | 47 +--
.../cassandra/db/marshal/CompositeType.java | 72 +++-
.../db/marshal/DynamicCompositeType.java | 6 +
.../hadoop/pig/AbstractCassandraStorage.java | 11 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 8 +-
.../apache/cassandra/service/CASConditions.java | 3 +-
.../cassandra/thrift/ThriftValidation.java | 3 +-
33 files changed, 1384 insertions(+), 371 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4fffb9a..bbacc4d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@
* Fix count(*) queries in a mixed cluster (CASSANDRA-6707)
* Improve repair tasks(snapshot, differencing) concurrency (CASSANDRA-6566)
* Fix replaying pre-2.0 commit logs (CASSANDRA-6714)
+ * Add static columns to CQL3 (CASSANDRA-6561)
Merged from 1.2:
* Catch memtable flush exceptions during shutdown (CASSANDRA-6735)
* Fix broken streams when replacing with same IP (CASSANDRA-6622)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index 03b95e0..d872bde 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -219,7 +219,7 @@ bc(syntax)..
'(' <definition> ( ',' <definition> )* ')'
( WITH <option> ( AND <option>)* )?
-<column-definition> ::= <identifier> <type> ( PRIMARY KEY )?
+<column-definition> ::= <identifier> <type> ( STATIC )? ( PRIMARY KEY )?
| PRIMARY KEY '(' <partition-key> ( ',' <identifier> )* ')'
<partition-key> ::= <partition-key>
@@ -288,11 +288,35 @@ In CQL, the order in which columns are defined for the @PRIMARY KEY@ matters. Th
The remaining columns of the @PRIMARY KEY@ definition, if any, are called __clustering columns. On a given physical node, rows for a given partition key are stored in the order induced by the clustering columns, making the retrieval of rows in that clustering order particularly efficient (see <a href="#selectStmt"><tt>SELECT</tt></a>).
+h4(#createTableStatic). @STATIC@ columns
+
+Some columns can be declared as @STATIC@ in a table definition. A column that is static will be "shared" by all the rows belonging to the same partition (having the same partition key). For instance, in:
+
+bc(sample).
+CREATE TABLE test (
+ pk int,
+ t int,
+ v text,
+ s text static,
+ PRIMARY KEY (pk, t)
+);
+INSERT INTO test(pk, t, v, s) VALUES (0, 0, 'val0', 'static0');
+INSERT INTO test(pk, t, v, s) VALUES (0, 1, 'val1', 'static1');
+SELECT * FROM test WHERE pk=0 AND t=0;
+
+the last query will return @'static1'@ as value for @s@, since @s@ is static and thus the 2nd insertion modified this "shared" value. Note however that static columns are only static within a given partition, and if in the example above both rows where from different partitions (i.e. if they had different value for @pk@), then the 2nd insertion would not have modified the value of @s@ for the first row.
+
+A few restrictions applies to when static columns are allowed:
+* tables with the @COMPACT STORAGE@ option (see below) cannot have them
+* a table without clustering columns cannot have static columns (in a table without clustering columns, every partition has only one row, and so every column is inherently static).
+* only non @PRIMARY KEY@ columns can be static
+
+
h4(#createTableOptions). @<option>@
The @CREATE TABLE@ statement supports a number of options that controls the configuration of a new table. These options can be specified after the @WITH@ keyword.
-The first of these option is @COMPACT STORAGE@. This option is mainly targeted towards backward compatibility for definitions created before CQL3 (see "www.datastax.com/dev/blog/thrift-to-cql3":http://www.datastax.com/dev/blog/thrift-to-cql3 for more details). The option also provides a slightly more compact layout of data on disk but at the price of diminished flexibility and extensibility for the table. Most notably, @COMPACT STORAGE@ tables cannot have collections and a @COMPACT STORAGE@ table with at least one clustering column supports exactly one (as in not 0 nor more than 1) column not part of the @PRIMARY KEY@ definition (which imply in particular that you cannot add nor remove columns after creation). For those reasons, @COMPACT STORAGE@ is not recommended outside of the backward compatibility reason evoked above.
+The first of these option is @COMPACT STORAGE@. This option is mainly targeted towards backward compatibility for definitions created before CQL3 (see "www.datastax.com/dev/blog/thrift-to-cql3":http://www.datastax.com/dev/blog/thrift-to-cql3 for more details). The option also provides a slightly more compact layout of data on disk but at the price of diminished flexibility and extensibility for the table. Most notably, @COMPACT STORAGE@ tables cannot have collections nor static columns and a @COMPACT STORAGE@ table with at least one clustering column supports exactly one (as in not 0 nor more than 1) column not part of the @PRIMARY KEY@ definition (which imply in particular that you cannot add nor remove columns after creation). For those reasons, @COMPACT STORAGE@ is not recommended outside of the backward compatibility reason evoked above.
Another option is @CLUSTERING ORDER@. It allows to define the ordering of rows on disk. It takes the list of the clustering column names with, for each of them, the on-disk order (Ascending or descending). Note that this option affects "what @ORDER BY@ are allowed during @SELECT@":#selectOrderBy.
@@ -1116,6 +1140,7 @@ The following describes the addition/changes brought for each version of CQL.
h3. 3.1.5
* It is now possible to group clustering columns in a relatiion, see "SELECT Where clauses":#selectWhere.
+* Added support for @STATIC@ columns, see "static in CREATE TABLE":#createTableStatic.
h3. 3.1.4
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 714a8bc..a319930 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -27,6 +27,7 @@ import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.ArrayUtils;
@@ -408,6 +409,7 @@ public final class CFMetaData
private volatile List<ColumnDefinition> partitionKeyColumns; // Always of size keyValidator.componentsCount, null padded if necessary
private volatile List<ColumnDefinition> clusteringKeyColumns; // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
private volatile Set<ColumnDefinition> regularColumns;
+ private volatile Set<ColumnDefinition> staticColumns;
private volatile ColumnDefinition compactValueColumn;
public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
@@ -721,6 +723,16 @@ public final class CFMetaData
return regularColumns;
}
+ public Set<ColumnDefinition> staticColumns()
+ {
+ return staticColumns;
+ }
+
+ public Iterable<ColumnDefinition> regularAndStaticColumns()
+ {
+ return Iterables.concat(staticColumns, regularColumns);
+ }
+
public ColumnDefinition compactValueColumn()
{
return compactValueColumn;
@@ -1328,7 +1340,7 @@ public final class CFMetaData
// Mixing counter with non counter columns is not supported (#2614)
if (defaultValidator instanceof CounterColumnType)
{
- for (ColumnDefinition def : regularColumns)
+ for (ColumnDefinition def : regularAndStaticColumns())
if (!(def.getValidator() instanceof CounterColumnType))
throw new ConfigurationException("Cannot add a non counter column (" + getColumnDefinitionComparator(def).getString(def.name) + ") in a counter column family");
}
@@ -1839,7 +1851,7 @@ public final class CFMetaData
if (column_metadata.get(to) != null)
throw new InvalidRequestException(String.format("Cannot rename column %s to %s in keyspace %s; another column of that name already exist", strFrom, strTo, cfName));
- if (def.type == ColumnDefinition.Type.REGULAR)
+ if (def.type == ColumnDefinition.Type.REGULAR || def.type == ColumnDefinition.Type.STATIC)
{
throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", strFrom));
}
@@ -1883,6 +1895,7 @@ public final class CFMetaData
: comparator.componentsCount() - (hasCollection() ? 2 : 1);
List<ColumnDefinition> ckCols = nullInitializedList(nbCkCols);
Set<ColumnDefinition> regCols = new HashSet<ColumnDefinition>();
+ Set<ColumnDefinition> statCols = new HashSet<ColumnDefinition>();
ColumnDefinition compactCol = null;
for (ColumnDefinition def : column_metadata.values())
@@ -1900,6 +1913,9 @@ public final class CFMetaData
case REGULAR:
regCols.add(def);
break;
+ case STATIC:
+ statCols.add(def);
+ break;
case COMPACT_VALUE:
assert compactCol == null : "There shouldn't be more than one compact value defined: got " + compactCol + " and " + def;
compactCol = def;
@@ -1911,6 +1927,7 @@ public final class CFMetaData
partitionKeyColumns = addDefaultKeyAliases(pkCols);
clusteringKeyColumns = addDefaultColumnAliases(ckCols);
regularColumns = regCols;
+ staticColumns = statCols;
compactValueColumn = addDefaultValueAlias(compactCol, isDense);
}
@@ -2074,6 +2091,20 @@ public final class CFMetaData
return true;
}
+ public boolean hasStaticColumns()
+ {
+ return !staticColumns.isEmpty();
+ }
+
+ public ColumnNameBuilder getStaticColumnNameBuilder()
+ {
+ assert comparator instanceof CompositeType && clusteringKeyColumns().size() > 0;
+ CompositeType.Builder builder = CompositeType.Builder.staticBuilder((CompositeType)comparator);
+ for (int i = 0; i < clusteringKeyColumns().size(); i++)
+ builder.add(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ return builder;
+ }
+
public void validateColumns(Iterable<Column> columns)
{
for (Column column : columns)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 7ca4d45..11340e7 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -64,7 +64,8 @@ public class ColumnDefinition
PARTITION_KEY,
CLUSTERING_KEY,
REGULAR,
- COMPACT_VALUE
+ COMPACT_VALUE,
+ STATIC
}
public final ByteBuffer name;
@@ -96,6 +97,11 @@ public class ColumnDefinition
return new ColumnDefinition(name, validator, componentIndex, Type.REGULAR);
}
+ public static ColumnDefinition staticDef(ByteBuffer name, AbstractType<?> validator, Integer componentIndex)
+ {
+ return new ColumnDefinition(name, validator, componentIndex, Type.STATIC);
+ }
+
public static ColumnDefinition compactValueDef(ByteBuffer name, AbstractType<?> validator)
{
return new ColumnDefinition(name, validator, null, Type.COMPACT_VALUE);
@@ -174,6 +180,7 @@ public class ColumnDefinition
public boolean isThriftCompatible()
{
+ // componentIndex == null should always imply isStatic in practice, but there is no harm in being too careful here.
return type == ColumnDefinition.Type.REGULAR && componentIndex == null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/CFDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFDefinition.java b/src/java/org/apache/cassandra/cql3/CFDefinition.java
index 638770d..b589a95 100644
--- a/src/java/org/apache/cassandra/cql3/CFDefinition.java
+++ b/src/java/org/apache/cassandra/cql3/CFDefinition.java
@@ -42,11 +42,12 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
public final CFMetaData cfm;
// LinkedHashMap because the order does matter (it is the order in the composite type)
- public final LinkedHashMap<ColumnIdentifier, Name> keys = new LinkedHashMap<ColumnIdentifier, Name>();
- public final LinkedHashMap<ColumnIdentifier, Name> columns = new LinkedHashMap<ColumnIdentifier, Name>();
- public final Name value;
+ private final LinkedHashMap<ColumnIdentifier, Name> partitionKeys = new LinkedHashMap<ColumnIdentifier, Name>();
+ private final LinkedHashMap<ColumnIdentifier, Name> clusteringColumns = new LinkedHashMap<ColumnIdentifier, Name>();
+ private final Name compactValue;
// Keep metadata lexicographically ordered so that wildcard expansion have a deterministic order
- public final Map<ColumnIdentifier, Name> metadata = new TreeMap<ColumnIdentifier, Name>();
+ private final Map<ColumnIdentifier, Name> staticColumns = new TreeMap<ColumnIdentifier, Name>();
+ private final Map<ColumnIdentifier, Name> regularColumns = new TreeMap<ColumnIdentifier, Name>();
public final boolean isComposite;
public final boolean hasCompositeKey;
@@ -65,7 +66,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
for (int i = 0; i < cfm.partitionKeyColumns().size(); ++i)
{
ColumnIdentifier id = new ColumnIdentifier(cfm.partitionKeyColumns().get(i).name, definitionType);
- this.keys.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.KEY_ALIAS, i, cfm.getKeyValidator().getComponents().get(i)));
+ this.partitionKeys.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.KEY_ALIAS, i, cfm.getKeyValidator().getComponents().get(i)));
}
this.isComposite = cfm.comparator instanceof CompositeType;
@@ -74,20 +75,25 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
for (int i = 0; i < cfm.clusteringKeyColumns().size(); ++i)
{
ColumnIdentifier id = new ColumnIdentifier(cfm.clusteringKeyColumns().get(i).name, definitionType);
- this.columns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_ALIAS, i, cfm.comparator.getComponents().get(i)));
+ this.clusteringColumns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_ALIAS, i, cfm.comparator.getComponents().get(i)));
}
if (isCompact)
{
- this.value = createValue(cfm);
+ this.compactValue = createValue(cfm);
}
else
{
- this.value = null;
+ this.compactValue = null;
for (ColumnDefinition def : cfm.regularColumns())
{
ColumnIdentifier id = new ColumnIdentifier(def.name, cfm.getColumnDefinitionComparator(def));
- this.metadata.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_METADATA, def.getValidator()));
+ this.regularColumns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_METADATA, def.getValidator()));
+ }
+ for (ColumnDefinition def : cfm.staticColumns())
+ {
+ ColumnIdentifier id = new ColumnIdentifier(def.name, cfm.getColumnDefinitionComparator(def));
+ this.staticColumns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.STATIC, def.getValidator()));
}
}
}
@@ -111,44 +117,86 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
: new Name(cfm.ksName, cfm.cfName, alias, Name.Kind.VALUE_ALIAS, cfm.getDefaultValidator());
}
+ public int partitionKeyCount()
+ {
+ return partitionKeys.size();
+ }
+
+ public Collection<Name> partitionKeys()
+ {
+ return partitionKeys.values();
+ }
+
+ public int clusteringColumnsCount()
+ {
+ return clusteringColumns.size();
+ }
+
+ public Collection<Name> clusteringColumns()
+ {
+ return clusteringColumns.values();
+ }
+
+ public Collection<Name> regularColumns()
+ {
+ return regularColumns.values();
+ }
+
+ public Collection<Name> staticColumns()
+ {
+ return regularColumns.values();
+ }
+
+ public Name compactValue()
+ {
+ return compactValue;
+ }
+
public Name get(ColumnIdentifier name)
{
- CFDefinition.Name kdef = keys.get(name);
- if (kdef != null)
- return kdef;
- if (value != null && name.equals(value.name))
- return value;
- CFDefinition.Name def = columns.get(name);
+ CFDefinition.Name def = partitionKeys.get(name);
if (def != null)
return def;
- return metadata.get(name);
+ if (compactValue != null && name.equals(compactValue.name))
+ return compactValue;
+ def = clusteringColumns.get(name);
+ if (def != null)
+ return def;
+ def = regularColumns.get(name);
+ if (def != null)
+ return def;
+ return staticColumns.get(name);
}
public Iterator<Name> iterator()
{
return new AbstractIterator<Name>()
{
- private final Iterator<Name> keyIter = keys.values().iterator();
- private final Iterator<Name> columnIter = columns.values().iterator();
+ private final Iterator<Name> keyIter = partitionKeys.values().iterator();
+ private final Iterator<Name> clusteringIter = clusteringColumns.values().iterator();
private boolean valueDone;
- private final Iterator<Name> metadataIter = metadata.values().iterator();
+ private final Iterator<Name> staticIter = staticColumns.values().iterator();
+ private final Iterator<Name> regularIter = regularColumns.values().iterator();
protected Name computeNext()
{
if (keyIter.hasNext())
return keyIter.next();
- if (columnIter.hasNext())
- return columnIter.next();
+ if (clusteringIter.hasNext())
+ return clusteringIter.next();
- if (value != null && !valueDone)
+ if (compactValue != null && !valueDone)
{
valueDone = true;
- return value;
+ return compactValue;
}
- if (metadataIter.hasNext())
- return metadataIter.next();
+ if (staticIter.hasNext())
+ return staticIter.next();
+
+ if (regularIter.hasNext())
+ return regularIter.next();
return endOfData();
}
@@ -173,7 +221,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
{
public static enum Kind
{
- KEY_ALIAS, COLUMN_ALIAS, VALUE_ALIAS, COLUMN_METADATA
+ KEY_ALIAS, COLUMN_ALIAS, VALUE_ALIAS, COLUMN_METADATA, STATIC
}
private Name(String ksName, String cfName, ColumnIdentifier name, Kind kind, AbstractType<?> type)
@@ -210,20 +258,29 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
{
return Objects.hashCode(ksName, cfName, name, type, kind, position);
}
+
+ public boolean isPrimaryKeyColumn()
+ {
+ return kind == Kind.KEY_ALIAS || kind == Kind.COLUMN_ALIAS;
+ }
}
@Override
public String toString()
{
StringBuilder sb = new StringBuilder();
- sb.append(Joiner.on(", ").join(keys.values()));
- if (!columns.isEmpty())
- sb.append(", ").append(Joiner.on(", ").join(columns.values()));
+ sb.append(Joiner.on(", ").join(partitionKeys.values()));
+ if (!clusteringColumns.isEmpty())
+ sb.append(", ").append(Joiner.on(", ").join(clusteringColumns.values()));
sb.append(" => ");
- if (value != null)
- sb.append(value.name);
- if (!metadata.isEmpty())
- sb.append("{").append(Joiner.on(", ").join(metadata.values())).append(" }");
+ if (compactValue != null)
+ sb.append(compactValue.name);
+ sb.append("{");
+ sb.append(Joiner.on(", ").join(staticColumns.values()));
+ if (!staticColumns.isEmpty())
+ sb.append(", ");
+ sb.append(Joiner.on(", ").join(regularColumns.values()));
+ sb.append("}");
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/ColumnCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnCondition.java b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
new file mode 100644
index 0000000..797dba6
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/ColumnCondition.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+
+/**
+ * A CQL3 condition.
+ */
+public class ColumnCondition
+{
+ public final CFDefinition.Name column;
+ private final Term value;
+
+ private List<ByteBuffer> variables;
+
+ private ColumnCondition(CFDefinition.Name column, Term value)
+ {
+ this.column = column;
+ this.value = value;
+ }
+
+ // The only ones we support so far
+ public static ColumnCondition equal(CFDefinition.Name column, Term value)
+ {
+ return new ColumnCondition(column, value);
+ }
+
+ // See CQL3CasConditions for why it's convenient to have this
+ public ColumnCondition attach(List<ByteBuffer> variables)
+ {
+ this.variables = variables;
+ return this;
+ }
+
+ /**
+ * Collects the column specification for the bind variables of this operation.
+ *
+ * @param boundNames the list of column specification where to collect the
+ * bind variables of this term in.
+ */
+ public void collectMarkerSpecification(VariableSpecifications boundNames)
+ {
+ value.collectMarkerSpecification(boundNames);
+ }
+
+ // Not overriding equals() because we need the variables to have been attached when this is
+ // called and so having a non standard method name might help avoid mistakes
+ public boolean equalsTo(ColumnCondition other) throws InvalidRequestException
+ {
+ return column.equals(other.column)
+ && value.bindAndGet(variables).equals(other.value.bindAndGet(other.variables));
+ }
+
+ private ColumnNameBuilder copyOrUpdatePrefix(CFMetaData cfm, ColumnNameBuilder rowPrefix)
+ {
+ return column.kind == CFDefinition.Name.Kind.STATIC ? cfm.getStaticColumnNameBuilder() : rowPrefix.copy();
+ }
+
+ /**
+ * Validates whether this condition applies to {@code current}.
+ */
+ public boolean appliesTo(ColumnNameBuilder rowPrefix, ColumnFamily current, long now) throws InvalidRequestException
+ {
+ if (column.type instanceof CollectionType)
+ return collectionAppliesTo((CollectionType)column.type, rowPrefix, current, now);
+
+ Column c = current.getColumn(copyOrUpdatePrefix(current.metadata(), rowPrefix).add(column.name.key).build());
+ ByteBuffer v = value.bindAndGet(variables);
+ return v == null
+ ? c == null || !c.isLive(now)
+ : c != null && c.isLive(now) && c.value().equals(v);
+ }
+
+ private boolean collectionAppliesTo(CollectionType type, ColumnNameBuilder rowPrefix, ColumnFamily current, final long now) throws InvalidRequestException
+ {
+ ColumnNameBuilder collectionPrefix = copyOrUpdatePrefix(current.metadata(), rowPrefix).add(column.name.key);
+ // We are testing for collection equality, so we need to have the expected values *and* only those.
+ ColumnSlice[] collectionSlice = new ColumnSlice[]{ new ColumnSlice(collectionPrefix.build(), collectionPrefix.buildAsEndOfRange()) };
+ // Filter live columns, this makes things simpler afterwards
+ Iterator<Column> iter = Iterators.filter(current.iterator(collectionSlice), new Predicate<Column>()
+ {
+ public boolean apply(Column c)
+ {
+ // we only care about live columns
+ return c.isLive(now);
+ }
+ });
+
+ Term.Terminal v = value.bind(variables);
+ if (v == null)
+ return !iter.hasNext();
+
+ switch (type.kind)
+ {
+ case LIST: return listAppliesTo(current.metadata(), iter, ((Lists.Value)v).elements);
+ case SET: return setAppliesTo(current.metadata(), iter, ((Sets.Value)v).elements);
+ case MAP: return mapAppliesTo(current.metadata(), iter, ((Maps.Value)v).map);
+ }
+ throw new AssertionError();
+ }
+
+ private static ByteBuffer collectionKey(CFMetaData cfm, Column c)
+ {
+ ByteBuffer[] bbs = ((CompositeType)cfm.comparator).split(c.name());
+ return bbs[bbs.length - 1];
+ }
+
+ private boolean listAppliesTo(CFMetaData cfm, Iterator<Column> iter, List<ByteBuffer> elements)
+ {
+ for (ByteBuffer e : elements)
+ if (!iter.hasNext() || iter.next().value().equals(e))
+ return false;
+ // We must not have more elements than expected
+ return !iter.hasNext();
+ }
+
+ private boolean setAppliesTo(CFMetaData cfm, Iterator<Column> iter, Set<ByteBuffer> elements)
+ {
+ Set<ByteBuffer> remaining = new HashSet<>(elements);
+ while (iter.hasNext())
+ {
+ if (remaining.isEmpty())
+ return false;
+
+ if (!remaining.remove(collectionKey(cfm, iter.next())))
+ return false;
+ }
+ return remaining.isEmpty();
+ }
+
+ private boolean mapAppliesTo(CFMetaData cfm, Iterator<Column> iter, Map<ByteBuffer, ByteBuffer> elements)
+ {
+ Map<ByteBuffer, ByteBuffer> remaining = new HashMap<>(elements);
+ while (iter.hasNext())
+ {
+ if (remaining.isEmpty())
+ return false;
+
+ Column c = iter.next();
+ if (!remaining.remove(collectionKey(cfm, c)).equals(c.value()))
+ return false;
+ }
+ return remaining.isEmpty();
+ }
+
+ public static class Raw
+ {
+ private final Term.Raw value;
+
+ public Raw(Term.Raw value)
+ {
+ this.value = value;
+ }
+
+ public ColumnCondition prepare(CFDefinition.Name receiver) throws InvalidRequestException
+ {
+ if (receiver.type instanceof CounterColumnType)
+ throw new InvalidRequestException("Condtions on counters are not supported");
+
+ return ColumnCondition.equal(receiver, value.prepare(receiver));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Constants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index bcfe00d..f99fd02 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -296,6 +296,7 @@ public abstract class Constants
public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
{
+ prefix = maybeUpdatePrefix(cf.metadata(), prefix);
ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
ByteBuffer value = t.bindAndGet(params.variables);
cf.addColumn(value == null ? params.makeTombstone(cname) : params.makeColumn(cname, value));
@@ -315,6 +316,7 @@ public abstract class Constants
if (bytes == null)
throw new InvalidRequestException("Invalid null value for counter increment");
long increment = ByteBufferUtil.toLong(bytes);
+ prefix = maybeUpdatePrefix(cf.metadata(), prefix);
ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
cf.addCounter(cname, increment);
}
@@ -337,6 +339,7 @@ public abstract class Constants
if (increment == Long.MIN_VALUE)
throw new InvalidRequestException("The negation of " + increment + " overflows supported counter precision (signed 8 bytes integer)");
+ prefix = maybeUpdatePrefix(cf.metadata(), prefix);
ByteBuffer cname = columnName == null ? prefix.build() : prefix.add(columnName.key).build();
cf.addCounter(cname, -increment);
}
@@ -356,7 +359,7 @@ public abstract class Constants
public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
{
- ColumnNameBuilder column = prefix.add(columnName.key);
+ ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
if (isCollection)
cf.addAtom(params.makeRangeTombstone(column.build(), column.buildAsEndOfRange()));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 6e7cf1c..a11a818 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -351,21 +351,22 @@ updateStatement returns [UpdateStatement.ParsedUpdate expr]
( usingClause[attrs] )?
K_SET columnOperation[operations] (',' columnOperation[operations])*
K_WHERE wclause=whereClause
- ( K_IF conditions=updateCondition )?
+ ( K_IF conditions=updateConditions )?
{
return new UpdateStatement.ParsedUpdate(cf,
attrs,
operations,
wclause,
- conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions);
+ conditions == null ? Collections.<Pair<ColumnIdentifier, ColumnCondition.Raw>>emptyList() : conditions);
}
;
-updateCondition returns [List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions]
- @init { conditions = new ArrayList<Pair<ColumnIdentifier, Operation.RawUpdate>>(); }
- : columnOperation[conditions] ( K_AND columnOperation[conditions] )*
+updateConditions returns [List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions]
+ @init { conditions = new ArrayList<Pair<ColumnIdentifier, ColumnCondition.Raw>>(); }
+ : columnCondition[conditions] ( K_AND columnCondition[conditions] )*
;
+
/**
* DELETE name1, name2
* FROM <CF>
@@ -381,13 +382,13 @@ deleteStatement returns [DeleteStatement.Parsed expr]
K_FROM cf=columnFamilyName
( usingClauseDelete[attrs] )?
K_WHERE wclause=whereClause
- ( K_IF conditions=updateCondition )?
+ ( K_IF conditions=updateConditions )?
{
return new DeleteStatement.Parsed(cf,
attrs,
columnDeletions,
wclause,
- conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions);
+ conditions == null ? Collections.<Pair<ColumnIdentifier, ColumnCondition.Raw>>emptyList() : conditions);
}
;
@@ -484,7 +485,8 @@ cfamDefinition[CreateTableStatement.RawStatement expr]
;
cfamColumns[CreateTableStatement.RawStatement expr]
- : k=cident v=comparatorType { $expr.addDefinition(k, v); } (K_PRIMARY K_KEY { $expr.addKeyAliases(Collections.singletonList(k)); })?
+ : k=cident v=comparatorType { boolean isStatic=false; } (K_STATIC {isStatic = true;})? { $expr.addDefinition(k, v, isStatic); }
+ (K_PRIMARY K_KEY { $expr.addKeyAliases(Collections.singletonList(k)); })?
| K_PRIMARY K_KEY '(' pkDef[expr] (',' c=cident { $expr.addColumnAlias(c); } )* ')'
;
@@ -558,10 +560,11 @@ alterTableStatement returns [AlterTableStatement expr]
AlterTableStatement.Type type = null;
CFPropDefs props = new CFPropDefs();
Map<ColumnIdentifier, ColumnIdentifier> renames = new HashMap<ColumnIdentifier, ColumnIdentifier>();
+ boolean isStatic = false;
}
: K_ALTER K_COLUMNFAMILY cf=columnFamilyName
( K_ALTER id=cident K_TYPE v=comparatorType { type = AlterTableStatement.Type.ALTER; }
- | K_ADD id=cident v=comparatorType { type = AlterTableStatement.Type.ADD; }
+ | K_ADD id=cident v=comparatorType ({ isStatic=true; } K_STATIC)? { type = AlterTableStatement.Type.ADD; }
| K_DROP id=cident { type = AlterTableStatement.Type.DROP; }
| K_WITH properties[props] { type = AlterTableStatement.Type.OPTS; }
| K_RENAME { type = AlterTableStatement.Type.RENAME; }
@@ -569,7 +572,7 @@ alterTableStatement returns [AlterTableStatement expr]
( K_AND idn=cident K_TO toIdn=cident { renames.put(idn, toIdn); } )*
)
{
- $expr = new AlterTableStatement(cf, type, id, v, props, renames);
+ $expr = new AlterTableStatement(cf, type, id, v, props, renames, isStatic);
}
;
@@ -845,6 +848,10 @@ columnOperation[List<Pair<ColumnIdentifier, Operation.RawUpdate>> operations]
}
;
+columnCondition[List<Pair<ColumnIdentifier, ColumnCondition.Raw>> conditions]
+ : key=cident '=' t=term { conditions.add(Pair.create(key, new ColumnCondition.Raw(t))); } // Note: we'll reject duplicates later
+ ;
+
properties[PropertyDefinitions props]
: property[props] (K_AND property[props])*
;
@@ -981,6 +988,7 @@ unreserved_function_keyword returns [String str]
| K_CUSTOM
| K_TRIGGER
| K_DISTINCT
+ | K_STATIC
) { $str = $k.text; }
| t=native_type { $str = t.toString(); }
;
@@ -1084,6 +1092,7 @@ K_NAN: N A N;
K_INFINITY: I N F I N I T Y;
K_TRIGGER: T R I G G E R;
+K_STATIC: S T A T I C;
// Case-insensitive alpha characters
fragment A: ('a'|'A');
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Lists.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index 4ca5eb3..4ad39db 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -266,7 +266,7 @@ public abstract class Lists
public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
{
// delete + append
- ColumnNameBuilder column = prefix.add(columnName.key);
+ ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
Appender.doAppend(t, cf, column, params);
}
@@ -309,6 +309,7 @@ public abstract class Lists
throw new InvalidRequestException(String.format("List index %d out of bound, list has size %d", idx, existingList.size()));
ByteBuffer elementName = existingList.get(idx).right.name();
+ // Since we reuse the name we're read, if it's a static column, the static marker will already be set
if (value == null)
{
@@ -336,7 +337,7 @@ public abstract class Lists
public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
{
- doAppend(t, cf, prefix.add(columnName.key), params);
+ doAppend(t, cf, maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key), params);
}
static void doAppend(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -376,7 +377,7 @@ public abstract class Lists
long time = PrecisionTime.REFERENCE_TIME - (System.currentTimeMillis() - PrecisionTime.REFERENCE_TIME);
List<ByteBuffer> toAdd = ((Lists.Value)value).elements;
- ColumnNameBuilder column = prefix.add(columnName.key);
+ ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
for (int i = 0; i < toAdd.size(); i++)
{
ColumnNameBuilder b = i == toAdd.size() - 1 ? column : column.copy();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Maps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java
index 30d796c..c332999 100644
--- a/src/java/org/apache/cassandra/cql3/Maps.java
+++ b/src/java/org/apache/cassandra/cql3/Maps.java
@@ -244,7 +244,7 @@ public abstract class Maps
public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
{
// delete + put
- ColumnNameBuilder column = prefix.add(columnName.key);
+ ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
Putter.doPut(t, cf, column, params);
}
@@ -274,7 +274,7 @@ public abstract class Maps
if (key == null)
throw new InvalidRequestException("Invalid null map key");
- ByteBuffer cellName = prefix.add(columnName.key).add(key).build();
+ ByteBuffer cellName = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key).add(key).build();
if (value == null)
{
@@ -302,7 +302,7 @@ public abstract class Maps
public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
{
- doPut(t, cf, prefix.add(columnName.key), params);
+ doPut(t, cf, maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key), params);
}
static void doPut(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -335,7 +335,7 @@ public abstract class Maps
throw new InvalidRequestException("Invalid null map key");
assert key instanceof Constants.Value;
- ByteBuffer cellName = prefix.add(columnName.key).add(((Constants.Value)key).bytes).build();
+ ByteBuffer cellName = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key).add(((Constants.Value)key).bytes).build();
cf.addColumn(params.makeTombstone(cellName));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java
index 00dd046..6bf46b5 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.cql3;
import java.nio.ByteBuffer;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.CounterColumnType;
@@ -54,6 +56,23 @@ public abstract class Operation
this.t = t;
}
+ // Whether the colum operated on is a static column (on trunk, Operation stores the ColumnDefinition directly,
+ // not just the column name, so we'll be able to remove that lookup and check ColumnDefinition.isStatic field
+ // directly. But for 2.0, it's simpler that way).
+ public boolean isStatic(CFMetaData cfm)
+ {
+ if (columnName == null)
+ return false;
+
+ ColumnDefinition def = cfm.getColumnDefinition(columnName.key);
+ return def != null && def.type == ColumnDefinition.Type.STATIC;
+ }
+
+ protected ColumnNameBuilder maybeUpdatePrefix(CFMetaData cfm, ColumnNameBuilder prefix)
+ {
+ return isStatic(cfm) ? cfm.getStaticColumnNameBuilder() : prefix;
+ }
+
/**
* @return whether the operation requires a read of the previous value to be executed
* (only lists setterByIdx, discard and discardByIdx requires that).
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/Sets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Sets.java b/src/java/org/apache/cassandra/cql3/Sets.java
index 0fcb8bf..69bc3d3 100644
--- a/src/java/org/apache/cassandra/cql3/Sets.java
+++ b/src/java/org/apache/cassandra/cql3/Sets.java
@@ -230,7 +230,7 @@ public abstract class Sets
public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
{
// delete + add
- ColumnNameBuilder column = prefix.add(columnName.key);
+ ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
cf.addAtom(params.makeTombstoneForOverwrite(column.build(), column.buildAsEndOfRange()));
Adder.doAdd(t, cf, column, params);
}
@@ -245,7 +245,7 @@ public abstract class Sets
public void execute(ByteBuffer rowKey, ColumnFamily cf, ColumnNameBuilder prefix, UpdateParameters params) throws InvalidRequestException
{
- doAdd(t, cf, prefix.add(columnName.key), params);
+ doAdd(t, cf, maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key), params);
}
static void doAdd(Term t, ColumnFamily cf, ColumnNameBuilder columnName, UpdateParameters params) throws InvalidRequestException
@@ -283,7 +283,7 @@ public abstract class Sets
? Collections.singleton(((Constants.Value)value).bytes)
: ((Sets.Value)value).elements;
- ColumnNameBuilder column = prefix.add(columnName.key);
+ ColumnNameBuilder column = maybeUpdatePrefix(cf.metadata(), prefix).add(columnName.key);
for (ByteBuffer bb : toDiscard)
{
ByteBuffer cellName = column.copy().add(bb).build();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
index ac6b999..4f3ff4a 100644
--- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
+++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java
@@ -52,9 +52,9 @@ public class TokenFct extends AbstractFunction
private static AbstractType[] getKeyTypes(CFMetaData cfm)
{
- AbstractType[] types = new AbstractType[cfm.getCfDef().keys.size()];
+ AbstractType[] types = new AbstractType[cfm.getCfDef().partitionKeyCount()];
int i = 0;
- for (CFDefinition.Name name : cfm.getCfDef().keys.values())
+ for (CFDefinition.Name name : cfm.getCfDef().partitionKeys())
types[i++] = name.type;
return types;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index f740ea6..85b3547 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -47,8 +47,15 @@ public class AlterTableStatement extends SchemaAlteringStatement
public final ColumnIdentifier columnName;
private final CFPropDefs cfProps;
private final Map<ColumnIdentifier, ColumnIdentifier> renames;
+ private final boolean isStatic; // Only for ALTER ADD
- public AlterTableStatement(CFName name, Type type, ColumnIdentifier columnName, CQL3Type validator, CFPropDefs cfProps, Map<ColumnIdentifier, ColumnIdentifier> renames)
+ public AlterTableStatement(CFName name,
+ Type type,
+ ColumnIdentifier columnName,
+ CQL3Type validator,
+ CFPropDefs cfProps,
+ Map<ColumnIdentifier, ColumnIdentifier> renames,
+ boolean isStatic)
{
super(name);
this.oType = type;
@@ -56,6 +63,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
this.validator = validator; // used only for ADD/ALTER commands
this.cfProps = cfProps;
this.renames = renames;
+ this.isStatic = isStatic;
}
public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException
@@ -79,7 +87,11 @@ public class AlterTableStatement extends SchemaAlteringStatement
{
case ADD:
if (cfDef.isCompact)
- throw new InvalidRequestException("Cannot add new column to a compact CF");
+ throw new InvalidRequestException("Cannot add new column to a COMPACT STORAGE table");
+
+ if (isStatic && !cfDef.isComposite)
+ throw new InvalidRequestException("Static columns are not allowed in COMPACT STORAGE tables");
+
if (name != null)
{
switch (name.kind)
@@ -87,7 +99,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
case KEY_ALIAS:
case COLUMN_ALIAS:
throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with a PRIMARY KEY part", columnName));
- case COLUMN_METADATA:
+ default:
throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with an existing column", columnName));
}
}
@@ -117,7 +129,9 @@ public class AlterTableStatement extends SchemaAlteringStatement
Integer componentIndex = cfDef.isComposite
? ((CompositeType)meta.comparator).types.size() - (cfDef.hasCollections ? 2 : 1)
: null;
- cfm.addColumnDefinition(ColumnDefinition.regularDef(columnName.key, type, componentIndex));
+ cfm.addColumnDefinition(isStatic
+ ? ColumnDefinition.staticDef(columnName.key, type, componentIndex)
+ : ColumnDefinition.regularDef(columnName.key, type, componentIndex));
break;
case ALTER:
@@ -178,6 +192,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
cfm.defaultValidator(validator.getType());
break;
case COLUMN_METADATA:
+ case STATIC:
ColumnDefinition column = cfm.getColumnDefinition(columnName.key);
// Thrift allows to change a column validator so CFMetaData.validateCompatibility will let it slide
// if we change to an incompatible type (contrarily to the comparator case). But we don't want to
@@ -196,10 +211,8 @@ public class AlterTableStatement extends SchemaAlteringStatement
break;
case DROP:
- if (cfDef.isCompact)
- throw new InvalidRequestException("Cannot drop columns from a compact CF");
- if (!cfDef.isComposite)
- throw new InvalidRequestException("Cannot drop columns from a non-CQL3 CF");
+ if (cfDef.isCompact || !cfDef.isComposite)
+ throw new InvalidRequestException("Cannot drop columns from a COMPACT STORAGE table");
if (name == null)
throw new InvalidRequestException(String.format("Column %s was not found in table %s", columnName, columnFamily()));
@@ -209,8 +222,9 @@ public class AlterTableStatement extends SchemaAlteringStatement
case COLUMN_ALIAS:
throw new InvalidRequestException(String.format("Cannot drop PRIMARY KEY part %s", columnName));
case COLUMN_METADATA:
+ case STATIC:
ColumnDefinition toDelete = null;
- for (ColumnDefinition columnDef : cfm.regularColumns())
+ for (ColumnDefinition columnDef : cfm.regularAndStaticColumns())
{
if (columnDef.name.equals(columnName.key))
toDelete = columnDef;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 6151490..d4acbae 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -20,11 +20,11 @@ package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.collect.Iterables;
import org.github.jamm.MemoryMeter;
import org.apache.cassandra.cql3.*;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
@@ -47,6 +47,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
public final Type type;
private final List<ModificationStatement> statements;
private final Attributes attrs;
+ private final boolean hasConditions;
/**
* Creates a new BatchStatement from a list of statements and a
@@ -58,10 +59,16 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
*/
public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs)
{
+ this(boundTerms, type, statements, attrs, false);
+ }
+
+ public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs, boolean hasConditions)
+ {
this.boundTerms = boundTerms;
this.type = type;
this.statements = statements;
this.attrs = attrs;
+ this.hasConditions = hasConditions;
}
public long measureForPreparedCache(MemoryMeter meter)
@@ -103,25 +110,15 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
return statements;
}
- private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
- throws RequestExecutionException, RequestValidationException
- {
- Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
- for (ModificationStatement statement : statements)
- addStatementMutations(statement, variables, local, cl, now, mutations);
-
- return mutations.values();
- }
-
- private Collection<? extends IMutation> getMutations(List<List<ByteBuffer>> variables, ConsistencyLevel cl, long now)
+ private Collection<? extends IMutation> getMutations(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
throws RequestExecutionException, RequestValidationException
{
Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>();
for (int i = 0; i < statements.size(); i++)
{
ModificationStatement statement = statements.get(i);
- List<ByteBuffer> statementVariables = variables.get(i);
- addStatementMutations(statement, statementVariables, false, cl, now, mutations);
+ List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
+ addStatementMutations(statement, statementVariables, local, cl, now, mutations);
}
return mutations.values();
}
@@ -156,31 +153,132 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
if (options.getConsistency() == null)
throw new InvalidRequestException("Invalid empty consistency level");
- execute(getMutations(options.getValues(), false, options.getConsistency(), queryState.getTimestamp()), options.getConsistency());
- return null;
+ return execute(new PreparedBatchVariables(options.getValues()), false, options.getConsistency(), queryState.getTimestamp());
}
- public void executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
+ public ResultMessage executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
{
if (cl == null)
throw new InvalidRequestException("Invalid empty consistency level");
- execute(getMutations(variables, cl, queryState.getTimestamp()), cl);
+ return execute(new BatchOfPreparedVariables(variables), false, cl, queryState.getTimestamp());
+ }
+
+ public ResultMessage execute(BatchVariables variables, boolean local, ConsistencyLevel cl, long now)
+ throws RequestExecutionException, RequestValidationException
+ {
+ // TODO: we don't support a serial consistency for batches in the protocol so defaulting to SERIAL for now.
+ // We'll need to fix that.
+ if (hasConditions)
+ return executeWithConditions(variables, cl, ConsistencyLevel.SERIAL, now);
+
+ executeWithoutConditions(getMutations(variables, local, cl, now), cl);
+ return null;
}
- private void execute(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
+ private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
{
boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1);
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
}
+ private ResultMessage executeWithConditions(BatchVariables variables, ConsistencyLevel cl, ConsistencyLevel serialCf, long now)
+ throws RequestExecutionException, RequestValidationException
+ {
+ ByteBuffer key = null;
+ String ksName = null;
+ String cfName = null;
+ ColumnFamily updates = null;
+ CQL3CasConditions conditions = null;
+ Set<ColumnIdentifier> columnsWithConditions = new LinkedHashSet<ColumnIdentifier>();
+
+ for (int i = 0; i < statements.size(); i++)
+ {
+ ModificationStatement statement = statements.get(i);
+ List<ByteBuffer> statementVariables = variables.getVariablesForStatement(i);
+ long timestamp = attrs.getTimestamp(now, statementVariables);
+ List<ByteBuffer> pks = statement.buildPartitionKeyNames(statementVariables);
+ if (pks.size() > 1)
+ throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)");
+ if (key == null)
+ {
+ key = pks.get(0);
+ ksName = statement.cfm.ksName;
+ cfName = statement.cfm.cfName;
+ conditions = new CQL3CasConditions(statement.cfm, now);
+ updates = UnsortedColumns.factory.create(statement.cfm);
+ }
+ else if (!key.equals(pks.get(0)))
+ {
+ throw new InvalidRequestException("Batch with conditions cannot span multiple partitions");
+ }
+
+ if (statement.hasConditions())
+ {
+ ColumnNameBuilder clusteringPrefix = statement.createClusteringPrefixBuilder(statementVariables);
+ statement.addUpdatesAndConditions(key, clusteringPrefix, updates, conditions, statementVariables, timestamp);
+ // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet
+ if (statement.hasIfNotExistCondition())
+ columnsWithConditions = null;
+ else if (columnsWithConditions != null)
+ Iterables.addAll(columnsWithConditions, statement.getColumnsWithConditions());
+ }
+ else
+ {
+ // getPartitionKey will already have thrown if there is more than one key involved
+ IMutation mut = statement.getMutations(statementVariables, false, cl, timestamp, true).iterator().next();
+ updates.resolve(mut.getColumnFamilies().iterator().next());
+ }
+ }
+
+ ColumnFamily result = StorageProxy.cas(ksName, cfName, key, conditions, updates, serialCf, cl);
+ return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, key, cfName, result, columnsWithConditions, true));
+ }
+
public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
{
- for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
+ assert !hasConditions;
+
+ for (IMutation mutation : getMutations(new PreparedBatchVariables(Collections.<ByteBuffer>emptyList()), true, null, queryState.getTimestamp()))
mutation.apply();
return null;
}
+ public interface BatchVariables
+ {
+ public List<ByteBuffer> getVariablesForStatement(int statementInBatch);
+ }
+
+ public static class PreparedBatchVariables implements BatchVariables
+ {
+ private final List<ByteBuffer> variables;
+
+ public PreparedBatchVariables(List<ByteBuffer> variables)
+ {
+ this.variables = variables;
+ }
+
+ public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
+ {
+ return variables;
+ }
+ }
+
+ public static class BatchOfPreparedVariables implements BatchVariables
+ {
+ private final List<List<ByteBuffer>> variables;
+
+ public BatchOfPreparedVariables(List<List<ByteBuffer>> variables)
+ {
+ this.variables = variables;
+ }
+
+ public List<ByteBuffer> getVariablesForStatement(int statementInBatch)
+ {
+ return variables.get(statementInBatch);
+ }
+ }
+
public String toString()
{
return String.format("BatchStatement(type=%s, statements=%s)", type, statements);
@@ -212,11 +310,12 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
VariableSpecifications boundNames = getBoundVariables();
List<ModificationStatement> statements = new ArrayList<ModificationStatement>(parsedStatements.size());
+ boolean hasConditions = false;
for (ModificationStatement.Parsed parsed : parsedStatements)
{
ModificationStatement stmt = parsed.prepare(boundNames);
if (stmt.hasConditions())
- throw new InvalidRequestException("Conditional updates are not allowed in batches");
+ hasConditions = true;
if (stmt.isCounter() && type != Type.COUNTER)
throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");
@@ -227,10 +326,23 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
statements.add(stmt);
}
+ if (hasConditions)
+ {
+ String ksName = null;
+ String cfName = null;
+ for (ModificationStatement stmt : statements)
+ {
+ if (ksName != null && (!stmt.keyspace().equals(ksName) || !stmt.columnFamily().equals(cfName)))
+ throw new InvalidRequestException("Batch with conditions cannot span multiple tables");
+ ksName = stmt.keyspace();
+ cfName = stmt.columnFamily();
+ }
+ }
+
Attributes prepAttrs = attrs.prepare("[batch]", "[batch]");
prepAttrs.collectMarkerSpecification(boundNames);
- return new ParsedStatement.Prepared(new BatchStatement(boundNames.size(), type, statements, prepAttrs), boundNames);
+ return new ParsedStatement.Prepared(new BatchStatement(boundNames.size(), type, statements, prepAttrs, hasConditions), boundNames);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
new file mode 100644
index 0000000..194ff0c
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasConditions.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.CASConditions;
+
+/**
+ * Processed CAS conditions on potentially multiple rows of the same partition.
+ */
+public class CQL3CasConditions implements CASConditions
+{
+ private final CFMetaData cfm;
+ private final long now;
+
+ // We index RowCondition by the prefix of the row they applied to for 2 reasons:
+ // 1) this allows to keep things sorted to build the ColumnSlice array below
+ // 2) this allows to detect when contradictory conditions are set (not exists with some other conditions on the same row)
+ private final SortedMap<ByteBuffer, RowCondition> conditions;
+
+ public CQL3CasConditions(CFMetaData cfm, long now)
+ {
+ this.cfm = cfm;
+ this.now = now;
+ this.conditions = new TreeMap<>(cfm.comparator);
+ }
+
+ public void addNotExist(ColumnNameBuilder prefix) throws InvalidRequestException
+ {
+ RowCondition previous = conditions.put(prefix.build(), new NotExistCondition(prefix, now));
+ if (previous != null && !(previous instanceof NotExistCondition))
+ throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row");
+ }
+
+ public void addConditions(ColumnNameBuilder prefix, Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+ {
+ ByteBuffer b = prefix.build();
+ RowCondition condition = conditions.get(b);
+ if (condition == null)
+ {
+ condition = new ColumnsConditions(prefix, now);
+ conditions.put(b, condition);
+ }
+ else if (!(condition instanceof ColumnsConditions))
+ {
+ throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row");
+ }
+ ((ColumnsConditions)condition).addConditions(conds, variables);
+ }
+
+ public IDiskAtomFilter readFilter()
+ {
+ assert !conditions.isEmpty();
+ ColumnSlice[] slices = new ColumnSlice[conditions.size()];
+ int i = 0;
+ // We always read CQL rows entirely as on CAS failure we want to be able to distinguish between "row exists
+ // but all values on why there were conditions are null" and "row doesn't exists", and we can't rely on the
+ // row marker for that (see #6623)
+ for (Map.Entry<ByteBuffer, RowCondition> entry : conditions.entrySet())
+ slices[i++] = new ColumnSlice(entry.getKey(), entry.getValue().rowPrefix.buildAsEndOfRange());
+
+ return new SliceQueryFilter(slices, false, slices.length, cfm.clusteringKeyColumns().size());
+ }
+
+ public boolean appliesTo(ColumnFamily current) throws InvalidRequestException
+ {
+ for (RowCondition condition : conditions.values())
+ {
+ if (!condition.appliesTo(current))
+ return false;
+ }
+ return true;
+ }
+
+ private static abstract class RowCondition
+ {
+ public final ColumnNameBuilder rowPrefix;
+ protected final long now;
+
+ protected RowCondition(ColumnNameBuilder rowPrefix, long now)
+ {
+ this.rowPrefix = rowPrefix;
+ this.now = now;
+ }
+
+ public abstract boolean appliesTo(ColumnFamily current) throws InvalidRequestException;
+ }
+
+ private static class NotExistCondition extends RowCondition
+ {
+ private NotExistCondition(ColumnNameBuilder rowPrefix, long now)
+ {
+ super(rowPrefix, now);
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ if (current == null)
+ return true;
+
+ Iterator<Column> iter = current.iterator(new ColumnSlice[]{ new ColumnSlice(rowPrefix.build(), rowPrefix.buildAsEndOfRange()) });
+ while (iter.hasNext())
+ if (iter.next().isLive(now))
+ return false;
+ return true;
+ }
+ }
+
+ private static class ColumnsConditions extends RowCondition
+ {
+ private final Map<ColumnIdentifier, ColumnCondition> conditions = new HashMap<>();
+
+ private ColumnsConditions(ColumnNameBuilder rowPrefix, long now)
+ {
+ super(rowPrefix, now);
+ }
+
+ public void addConditions(Collection<ColumnCondition> conds, List<ByteBuffer> variables) throws InvalidRequestException
+ {
+ for (ColumnCondition condition : conds)
+ {
+ // We will need the variables in appliesTo but with protocol batches, each condition in this object can have a
+ // different list of variables. So attach them to the condition directly, it's not particulary elegant but its simpler
+ ColumnCondition previous = conditions.put(condition.column.name, condition.attach(variables));
+ // If 2 conditions are actually equal, let it slide
+ if (previous != null && !previous.equalsTo(condition))
+ throw new InvalidRequestException("Duplicate and incompatible conditions for column " + condition.column.name);
+ }
+ }
+
+ public boolean appliesTo(ColumnFamily current) throws InvalidRequestException
+ {
+ if (current == null)
+ return conditions.isEmpty();
+
+ for (ColumnCondition condition : conditions.values())
+ if (!condition.appliesTo(rowPrefix, current, now))
+ return false;
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
index 8974523..5c3fcb9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
@@ -32,10 +32,12 @@ public class ColumnGroupMap
{
private final ByteBuffer[] fullPath;
private final Map<ByteBuffer, Value> map = new HashMap<ByteBuffer, Value>();
+ public final boolean isStatic; // Whether or not the group correspond to "static" cells
- private ColumnGroupMap(ByteBuffer[] fullPath)
+ private ColumnGroupMap(ByteBuffer[] fullPath, boolean isStatic)
{
this.fullPath = fullPath;
+ this.isStatic = isStatic;
}
private void add(ByteBuffer[] fullName, int idx, Column column)
@@ -126,7 +128,7 @@ public class ColumnGroupMap
if (currentGroup == null)
{
- currentGroup = new ColumnGroupMap(current);
+ currentGroup = new ColumnGroupMap(current, composite.isStaticName(c.name()));
currentGroup.add(current, idx, c);
previous = current;
return;
@@ -135,7 +137,8 @@ public class ColumnGroupMap
if (!isSameGroup(current))
{
groups.add(currentGroup);
- currentGroup = new ColumnGroupMap(current);
+ // Note that we know that only the first group built can be static
+ currentGroup = new ColumnGroupMap(current, false);
}
currentGroup.add(current, idx, c);
previous = current;
@@ -167,5 +170,25 @@ public class ColumnGroupMap
}
return groups;
}
+
+ public boolean isEmpty()
+ {
+ return currentGroup == null && groups.isEmpty();
+ }
+
+ public ColumnGroupMap firstGroup()
+ {
+ if (currentGroup != null)
+ {
+ groups.add(currentGroup);
+ currentGroup = null;
+ }
+ return groups.get(0);
+ }
+
+ public void discardFirst()
+ {
+ groups.remove(0);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 3ef6f5a..376fa4a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -87,6 +87,15 @@ public class CreateIndexStatement extends SchemaAlteringStatement
if (cfm.getCfDef().isCompact && cd.type != ColumnDefinition.Type.REGULAR)
throw new InvalidRequestException(String.format("Secondary index on %s column %s is not yet supported for compact table", cd.type, columnName));
+ // It would be possible to support 2ndary index on static columns (but not without modifications of at least ExtendedFilter and
+ // CompositesIndex) and maybe we should, but that means a query like:
+ // SELECT * FROM foo WHERE static_column = 'bar'
+ // would pull the full partition every time the static column of partition is 'bar', which sounds like offering a
+ // fair potential for foot-shooting, so I prefer leaving that to a follow up ticket once we have identified cases where
+ // such indexing is actually useful.
+ if (cd.type == ColumnDefinition.Type.STATIC)
+ throw new InvalidRequestException("Secondary indexes are not allowed on static columns");
+
if (cd.getValidator().isCollection() && !properties.isCustom)
throw new InvalidRequestException("Indexes on collections are no yet supported");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09d8769/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 74f7570..632194c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -52,14 +52,16 @@ public class CreateTableStatement extends SchemaAlteringStatement
private ByteBuffer valueAlias;
private final Map<ColumnIdentifier, AbstractType> columns = new HashMap<ColumnIdentifier, AbstractType>();
+ private final Set<ColumnIdentifier> staticColumns;
private final CFPropDefs properties;
private final boolean ifNotExists;
- public CreateTableStatement(CFName name, CFPropDefs properties, boolean ifNotExists)
+ public CreateTableStatement(CFName name, CFPropDefs properties, boolean ifNotExists, Set<ColumnIdentifier> staticColumns)
{
super(name);
this.properties = properties;
this.ifNotExists = ifNotExists;
+ this.staticColumns = staticColumns;
try
{
@@ -101,7 +103,10 @@ public class CreateTableStatement extends SchemaAlteringStatement
for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
{
- columnDefs.put(col.getKey().key, ColumnDefinition.regularDef(col.getKey().key, col.getValue(), componentIndex));
+ ColumnIdentifier id = col.getKey();
+ columnDefs.put(id.key, staticColumns.contains(id)
+ ? ColumnDefinition.staticDef(id.key, col.getValue(), componentIndex)
+ : ColumnDefinition.regularDef(id.key, col.getValue(), componentIndex));
}
return columnDefs;
@@ -166,6 +171,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
private final List<List<ColumnIdentifier>> keyAliases = new ArrayList<List<ColumnIdentifier>>();
private final List<ColumnIdentifier> columnAliases = new ArrayList<ColumnIdentifier>();
private final Map<ColumnIdentifier, Boolean> definedOrdering = new LinkedHashMap<ColumnIdentifier, Boolean>(); // Insertion ordering is important
+ private final Set<ColumnIdentifier> staticColumns = new HashSet<ColumnIdentifier>();
private boolean useCompactStorage;
private final Multiset<ColumnIdentifier> definedNames = HashMultiset.create(1);
@@ -195,7 +201,7 @@ public class CreateTableStatement extends SchemaAlteringStatement
properties.validate();
- CreateTableStatement stmt = new CreateTableStatement(cfName, properties, ifNotExists);
+ CreateTableStatement stmt = new CreateTableStatement(cfName, properties, ifNotExists, staticColumns);
Map<ByteBuffer, CollectionType> definedCollections = null;
for (Map.Entry<ColumnIdentifier, CQL3Type> entry : definitions.entrySet())
@@ -225,6 +231,8 @@ public class CreateTableStatement extends SchemaAlteringStatement
AbstractType<?> t = getTypeAndRemove(stmt.columns, alias);
if (t instanceof CounterColumnType)
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", alias));
+ if (staticColumns.contains(alias))
+ throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
keyTypes.add(t);
}
stmt.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes);
@@ -260,10 +268,13 @@ public class CreateTableStatement extends SchemaAlteringStatement
{
if (definedCollections != null)
throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
- stmt.columnAliases.add(columnAliases.get(0).key);
- stmt.comparator = getTypeAndRemove(stmt.columns, columnAliases.get(0));
+ ColumnIdentifier alias = columnAliases.get(0);
+ stmt.columnAliases.add(alias.key);
+ stmt.comparator = getTypeAndRemove(stmt.columns, alias);
if (stmt.comparator instanceof CounterColumnType)
- throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", stmt.columnAliases.get(0)));
+ throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", alias));
+ if (staticColumns.contains(alias))
+ throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", alias));
}
else
{
@@ -275,6 +286,8 @@ public class CreateTableStatement extends SchemaAlteringStatement
AbstractType<?> type = getTypeAndRemove(stmt.columns, t);
if (type instanceof CounterColumnType)
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", t.key));
+ if (staticColumns.contains(t))
+ throw new InvalidRequestException(String.format("Static column %s cannot be part of the PRIMARY KEY", t));
types.add(type);
}
@@ -298,6 +311,16 @@ public class CreateTableStatement extends SchemaAlteringStatement
}
}
+ if (!staticColumns.isEmpty())
+ {
+ // Only CQL3 tables can have static columns
+ if (useCompactStorage)
+ throw new InvalidRequestException("Static columns are not supported in COMPACT STORAGE tables");
+ // Static columns only make sense if we have at least one clustering column. Otherwise everything is static anyway
+ if (columnAliases.isEmpty())
+ throw new InvalidRequestException("Static columns are only useful (and thus allowed) if the table has at least one clustering column");
+ }
+
if (useCompactStorage && !stmt.columnAliases.isEmpty())
{
if (stmt.columns.isEmpty())
@@ -373,10 +396,12 @@ public class CreateTableStatement extends SchemaAlteringStatement
return isReversed != null && isReversed ? ReversedType.getInstance(type) : type;
}
- public void addDefinition(ColumnIdentifier def, CQL3Type type)
+ public void addDefinition(ColumnIdentifier def, CQL3Type type, boolean isStatic)
{
definedNames.add(def);
definitions.put(def, type);
+ if (isStatic)
+ staticColumns.add(def);
}
public void addKeyAliases(List<ColumnIdentifier> aliases)